I'm currently playing with Delta Tables on my local machine and I encountered a behavior that I don't understand. I create my DeltaTable like so:
df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('overwriteSchema', 'true') \
    .save(my_table_path)
dt = DeltaTable.forPath(spark, my_table_path)
Then, I run the following command.
spark.sql(f"ALTER TABLE delta.`{my_table_path}` ADD COLUMNS (my_new_col string)")
This adds a new column to the schema as can be seen by running
spark.sql(f"DESCRIBE TABLE delta.`{my_table_path}`").show()
It even shows as the last DeltaTable operation in the history, by running dt.history().show().
However, this is not reflected in the DeltaTable object dt, infact if I run dt.toDF().printSchema(), the new column is not displayed.
On the other hand, if I do something like
spark.sql(f"UPDATE delta.`{my_table_path}` SET existing_col = 'foo'")
and after I run dt.toDF().show(), the update is reflected and shown under existing_col, which now appears containing foo everywhere.
The only way I found out to have the dt object reflect the schema change, is to run dt = DeltaTable.forPath(spark, my_table_path) again after ALTER TABLE.
What am I missing?
Edit: Added repo link for reproducibility.
https://github.com/wtfzambo/delta-bug-working-example
Edit2: Repo uses Delta 1.0, but issue exists also in Delta 2.*
Thanks for your patience @wtfzambo - I just realized when I reproed this myself I should seen the issue immediately so sorry for taking so long to realize this. Actually, the way this works is as expected but allow me to explain.
ALTER TABLE statement, the schema in fact did change and it was registered within the Delta transaction log but up to this point, it was a metadata change.UPDATE statement, then this was registered and the data itself reflected this change, i.e. the Parquet files within the table can see this change.dt.toDF().printSchema(), it is because the DataFrame could only see the data and metadata together but when you ran the ALTER TABLE statement, it was only a metadata change.To better showcase this, allow me to provide context via the file system. To recreate this exact scenario, please use the docker at https://go.delta.io/docker and use the DELTA_PACKAGE_VERSION as delta-core_2.12:2.1.0.  That is, run the Docker container and use the pts-ark steps:
$SPARK_HOME/bin/pyspark --packages io.delta:delta-core_2.12:2.1.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
# Create Spark DataFrame
data = spark.range(0, 5)
# Write Delta table
data.write.format("delta").save("/tmp/delta-table")
# Read Delta table
df = spark.read.format("delta").load("/tmp/delta-table")
# Show Delta table
df.show()
spark.sql(f"DESCRIBE TABLE delta.'/tmp/delta-table/'").show(), you will see:+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|             id|   bigint|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+
As well, if you list the files of the table:
NBuser@5b0edf0b8779:/tmp/delta-table$ ls -lsgA
total 44
4 drwxr-xr-x 2 NBuser 4096 Mar 12 23:50 _delta_log
4 -rw-r--r-- 1 NBuser  478 Mar 12 23:29 part-00000-4abcc1fa-b2c8-441a-a392-8dab57edd819-c000.snappy.parquet
4 -rw-r--r-- 1 NBuser   12 Mar 12 23:29 .part-00000-4abcc1fa-b2c8-441a-a392-8dab57edd819-c000.snappy.parquet.crc
4 -rw-r--r-- 1 NBuser  478 Mar 12 23:29 part-00001-6327358c-8c00-4ad6-9e3d-263f0ea66e3f-c000.snappy.parquet
4 -rw-r--r-- 1 NBuser   12 Mar 12 23:29 .part-00001-6327358c-8c00-4ad6-9e3d-263f0ea66e3f-c000.snappy.parquet.crc
4 -rw-r--r-- 1 NBuser  478 Mar 12 23:29 part-00002-eea1d287-be68-4a62-874d-ab4e39c6a825-c000.snappy.parquet
4 -rw-r--r-- 1 NBuser   12 Mar 12 23:29 .part-00002-eea1d287-be68-4a62-874d-ab4e39c6a825-c000.snappy.parquet.crc
4 -rw-r--r-- 1 NBuser  478 Mar 12 23:29 part-00003-c79b4180-5968-4fee-8181-6752d9cfb333-c000.snappy.parquet
4 -rw-r--r-- 1 NBuser   12 Mar 12 23:29 .part-00003-c79b4180-5968-4fee-8181-6752d9cfb333-c000.snappy.parquet.crc
4 -rw-r--r-- 1 NBuser  478 Mar 12 23:29 part-00004-c3399acd-75ca-4ea5-85f9-03fa60897161-c000.snappy.parquet
4 -rw-r--r-- 1 NBuser   12 Mar 12 23:29 .part-00004-c3399acd-75ca-4ea5-85f9-03fa60897161-c000.snappy.parquet.crc
spark.sql(f"ALTER TABLE delta.'/tmp/delta-table/' ADD COLUMNS (blah string)") and when you run the DESCRIBE TABLE statement, you get what you expected:spark.sql(f"DESCRIBE TABLE delta.`/tmp/delta-table/`").show()
+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|             id|   bigint|       |
|           blah|   string|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+
But, if you were to run the ls -lsgA for the temp table, note that the files look exactly the same.  That is, there are no changes to the data, but only the metadata.
To see this, run the command:
NBuser@5b0edf0b8779:/tmp/delta-table/_delta_log$ ls -lsgA
total 16
4 -rw-r--r-- 1 NBuser 2082 Mar 12 23:29 00000000000000000000.json
4 -rw-r--r-- 1 NBuser   28 Mar 12 23:29 .00000000000000000000.json.crc
4 -rw-r--r-- 1 NBuser  752 Mar 12 23:38 00000000000000000001.json
4 -rw-r--r-- 1 NBuser   16 Mar 12 23:38 .00000000000000000001.json.crc
Note the 00000000000000000001.json which contains the transaction which corresponds to your ALTER TABLE command.  If you were to read this .json file it would like this:
{"metaData": {
    "id":"d583238c-87ab-4de0-a09d-141ef499371d",
    "format":{"provider":"parquet","options":{}},
    "schemaString":"
        {\"type\":\"struct\",
         \"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},
                     {\"name\":\"blah\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
                     ,"partitionColumns":[],"configuration":{},"createdTime":1678663791967}}
    {"commitInfo":{"timestamp":1678664321014,
        "operation":"ADD COLUMNS","operationParameters":{"columns":"[{\"column\":{\"name\":\"blah\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}}]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.0","txnId":"b54db68d-652b-4930-82d5-61a542d82100"}}
Notice the schemaString -> fields which show the blah column and notice the operation command that points to a ADD COLUMNS command which also includes the blah column.
So the key point here is that while the transaction log contains the
blahcolumn being added, the root table directory table folder has no changes to the .parquet files meaning that the change was reflected in the metadata but not the data.
Until you ran the UPDATE command, the changes were not reflected in the .parquet files (i.e. data files).  And in the case of the Spark DataFrame, it can only pull the schema when it's associated with the data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With