I have a problem regarding merging csv files using pysparkSQL with delta table. I managed to create upsert function that update if matched and insert if not matched.
I want to add column ID to the final delta table and increment it each time we insert data. This column identify each row in our delta table. Is there any way to put that in place ?
def Merge(dict1, dict2):
res = {**dict1, **dict2}
return res
def create_default_values_dict(correspondance_df,marketplace):
dict_output = {}
for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
dict_output[field] = 'null'
# We want to increment the id row each time we perform an insertion (TODO TODO TODO)
# if field == 'id':
# dict_output['id'] = col('id')+1
# else:
return dict_output
def create_matched_update_dict(mapping, products_table, updates_table):
output = {}
for k,v in mapping.items():
if k == 'source_name':
output['products.source_name'] = lit(v)
else:
output[products_table + '.' + k] = F.when(col(updates_table + '.' + v).isNull(), col(products_table + '.' + k)).when(col(updates_table + '.' + v).isNotNull(), col(updates_table + '.' + v))
return output
insert_dict = create_not_matched_insert_dict(mapping, 'products', 'updates')
default_dict = create_default_values_dict(correspondance_df_products, 'Cdiscount')
insert_values = Merge(insert_dict, default_dict)
update_values = create_matched_update_dict(mapping, 'products', 'updates')
delta_table_products.alias('products').merge(
updates_df_table.limit(20).alias('updates'),
"products.barcode_ean == updates.ean") \
.whenMatchedUpdate(set = update_values) \
.whenNotMatchedInsert(values = insert_values)\
.execute()
I tried to increment the column id in the function create_default_values_dict but it's seems to not working well, it doesn't auto increment by 1. Is there another way to solve this problem ? Thanks in advance :)
Databricks has IDENTITY columns for hosted Spark
https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-table-using.html#parameters
GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY
[ ( [ START WITH start ] [ INCREMENT BY step ] ) ]
This works on Delta tables.
Example:
create table gen1 (
id long GENERATED ALWAYS AS IDENTITY
, t string
)
Requires Runtime version 10.4 or above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With