Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe to Postgres using Copy Command -pyspark

I need to write a spark dataframe to Postgres DB . I have used the following

df.write
.option("numPartitions",partions)
.option("batchsize",batchsize)
.jdbc(url=url, table="table_name", mode=append, properties=properties) 

This works fine however, I want to compare the performance with 'Copy' command

Tried the following

output = io.StringIO() 

 csv_new.write
.format("csv")
.option("header", "true")
.save(path=output)

output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_pivot_table', null="") \\using psycopg2 
con_bb.commit() 

This doesnot seem to work with error 'type' object is not iterable

worked well with Pandas dataframe

output= io.StringIO()
df.to_csv(path_or_buf=output,sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_ts_devicedatacollection_aggregate', null="")  
con_bb.commit()

Any leads on how to implement the Pandas equivalent in Pyspark. P.S: Its performance critical hence converting to spark df to Pandas df is not an option. Any help would be greatly appreciated

like image 639
KVS Avatar asked Oct 15 '25 09:10

KVS


1 Answers

What currently works very well for me (100-200GB of csv fies with around 1.000.000.000 rows) is using psycopg2 together with multiprocessing

Available cores: 200

First I export the spark dataframe in multiple files that are multibles of the available cores

filepath="/base_path/psql_multiprocessing_data"

df.repartition(400) \
    .write \
    .mode("overwrite") \
    .format("csv") \ # even faster using binary format, but ok with csv
    .save(filepath,header='false')

Then I iterate in parallel over all files in the folder via

import glob
import psycopg2   
from multiprocessing import Pool, cpu_count

file_path_list=sorted(glob.glob("/base_path/psql_multiprocessing_data/*.csv"))

def psql_copy_load(fileName):
    con = psycopg2.connect(database="my_db",user="my_user",password="my_password",host="my_host",port="my_port")
    cursor = con.cursor()
    with open(fileName, 'r') as f:
        # next(f)  # in case to skip the header row.
        cursor.copy_from(f, 'my_schema.my_table', sep=",")
    
    con.commit()
    con.close()
    return (fileName)
    

with Pool(cpu_count()) as p:
        p.map(psql_copy_load,file_path_list)

print("parallelism (cores): ",cpu_count())
print("files processed: ",len(file_path_list))

I did not further try to export the data as binary because it got complicated with the correct heades and data types and I was happy with the run time of around 25-30 Minutes (with 6 columns)

like image 116
Alex Ortner Avatar answered Oct 17 '25 01:10

Alex Ortner



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!