what is the depth with which schema evolution works while merging?
Automatic schema evolution does not work while merging in the following case.
import json
d1 = {'a':'b','b':{'c':{'1':1}}}
d2 = {'a':'s','b':{'c':{'1':2,'2':2}}}
d3 = {'a':'v','b':{'c':{'1':4}}}
df1 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d1)]))
#passes
df1.write.saveAsTable('test_table4',format='delta',mode='overwrite', path=f"hdfs://hdmaster:9000/dest/test_table4")
df2 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d2)]))
df2.createOrReplaceTempView('updates')
query = """
MERGE INTO test_table4 existing_records
USING updates updates
ON existing_records.a=updates.a
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #passes
df3 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d3)]))
df3.createOrReplaceTempView('updates')
query = """
MERGE INTO test_table4 existing_records
USING updates updates
ON existing_records.a=updates.a
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #FAILS #FAILS
This looks like failing when depth is more than 2 and incoming df has columns missing.
Is this intentionally like this?
This is handled perfectly with option("mergeSchema", "true") if want to append. But I want to UPSERT the data. But Merge is not able to handle this schema change
Using Delta Lake version 0.8.0
In Delta 0.8, this should be regulated by setting spark.databricks.delta.schema.autoMerge.enabled to true, in addition to the mergeSchema that is more for append mode.
See the Delta 0.8 announcement blog post for more details of this feature.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With