I'm trying to add a new column to data stored as a Delta Table in Azure Blob Storage. Most of the actions being done on the data are upserts, with many updates and few new inserts. My code to write data currently looks like this:
DeltaTable.forPath(spark, deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),
"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
From these docs, it looks like Delta Lake supports adding new columns on insertAll() and updateAll() calls only. However, I'm updating only when certain conditions are met and want the new column added to all the existing data (with a default value of null).
I've come up with a solution that seems extremely clunky and am wondering if there's a more elegant approach. Here's my current proposed solution:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")
// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")
// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)
Alter your delta table first and then you do your merge operation:
from pyspark.sql.functions import lit
spark.read.format("delta").load('/mnt/delta/cov')\
.withColumn("Recovered", lit(''))\
.write\
.format("delta")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.save('/mnt/delta/cov')
New columns can also be added with SQL commands as follows:
ALTER TABLE dbName.TableName ADD COLUMNS (newColumnName dataType)
UPDATE dbName.TableName SET newColumnName = val;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With