I am trying to do some performance optimization for Spark job using bucketing technique. I am reading .parquet and .csv files and do some transformations. After I am doing bucketing and join two DataFrames. Then I am writing joined DF to parquet but I have an empty file of ~500B instead of 500Mb.
Blob
val readParquet = spark.read.parquet(inputP)
readParquet
.write
.format("parquet")
.bucketBy(23, "column")
.sortBy("column")
.mode(SaveMode.Overwrite)
.saveAsTable("bucketedTable1")
val firstTableDF = spark.table("bucketedTable1")
val readCSV = spark.read.csv(inputCSV)
readCSV
.filter(..)
.ordrerBy(someColumn)
.write
.format("parquet")
.bucketBy(23, "column")
.sortBy("column")
.mode(SaveMode.Overwrite)
.saveAsTable("bucketedTable2")
val secondTableDF = spark.table("bucketedTable2")
val resultDF = secondTableDF
.join(firstTableDF, Seq("column"), "fullouter")
.
.
resultDF
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.parquet(output)
When I launch Spark job in command line using ssh I have correct result, ~500Mb parquet file which I can see using Hive. If I run the same job using oozie workflow I have an empty file (~500 Bytes).
When I do .show() on my resultDF I can see the data but I have empty parquet file.
+-----------+---------------+----------+
| col1| col2 | col3|
+-----------+---------------+----------+
|33601234567|208012345678910| LOL|
|33601234567|208012345678910| LOL|
|33601234567|208012345678910| LOL|
There is no problem writing to parquet when I am not saving data as a table. It occurs only with DF created from table.
Any suggestions ?
Thanks in advance for any thoughts!
I figured it out for my case I just added an option .option("path", "/sources/tmp_files_path"). Now I can use bucketing and I have a data in my output files.
readParquet
.write
.option("path", "/sources/tmp_files_path")
.mode(SaveMode.Overwrite)
.bucketBy(23, "column")
.sortBy("column")
.saveAsTable("bucketedTable1")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With