I need to write a spark dataframe to Postgres DB . I have used the following
df.write
.option("numPartitions",partions)
.option("batchsize",batchsize)
.jdbc(url=url, table="table_name", mode=append, properties=properties)
This works fine however, I want to compare the performance with 'Copy' command
Tried the following
output = io.StringIO()
csv_new.write
.format("csv")
.option("header", "true")
.save(path=output)
output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_pivot_table', null="") \\using psycopg2
con_bb.commit()
This doesnot seem to work with error 'type' object is not iterable
worked well with Pandas dataframe
output= io.StringIO()
df.to_csv(path_or_buf=output,sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_ts_devicedatacollection_aggregate', null="")
con_bb.commit()
Any leads on how to implement the Pandas equivalent in Pyspark. P.S: Its performance critical hence converting to spark df to Pandas df is not an option. Any help would be greatly appreciated
What currently works very well for me (100-200GB of csv fies with around 1.000.000.000 rows) is using psycopg2 together with multiprocessing
Available cores: 200
First I export the spark dataframe in multiple files that are multibles of the available cores
filepath="/base_path/psql_multiprocessing_data"
df.repartition(400) \
.write \
.mode("overwrite") \
.format("csv") \ # even faster using binary format, but ok with csv
.save(filepath,header='false')
Then I iterate in parallel over all files in the folder via
import glob
import psycopg2
from multiprocessing import Pool, cpu_count
file_path_list=sorted(glob.glob("/base_path/psql_multiprocessing_data/*.csv"))
def psql_copy_load(fileName):
con = psycopg2.connect(database="my_db",user="my_user",password="my_password",host="my_host",port="my_port")
cursor = con.cursor()
with open(fileName, 'r') as f:
# next(f) # in case to skip the header row.
cursor.copy_from(f, 'my_schema.my_table', sep=",")
con.commit()
con.close()
return (fileName)
with Pool(cpu_count()) as p:
p.map(psql_copy_load,file_path_list)
print("parallelism (cores): ",cpu_count())
print("files processed: ",len(file_path_list))
I did not further try to export the data as binary because it got complicated with the correct heades and data types and I was happy with the run time of around 25-30 Minutes (with 6 columns)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With