I am working on Spark SQL where I need to find out Diff between two large CSV's.
Diff should give:-
Inserted Rows or new Record // Comparing only Id's
Changed Rows (Not include inserted ones) - Comparing all column values
Spark 2.4.4 + Java
I am using Databricks to Read/Write CSV
Dataset<Row> insertedDf = newDf_temp.join(oldDf_temp,oldDf_temp.col(key)
.equalTo(newDf_temp.col(key)),"left_anti");
Long insertedCount = insertedDf.count();
logger.info("Inserted File Count == "+insertedCount);
Dataset<Row> deletedDf = oldDf_temp.join(newDf_temp,oldDf_temp.col(key)
.equalTo(newDf_temp.col(key)),"left_anti")
.select(oldDf_temp.col(key));
Long deletedCount = deletedDf.count();
logger.info("deleted File Count == "+deletedCount);
Dataset<Row> changedDf = newDf_temp.exceptAll(oldDf_temp); // This gives rows (New +changed Records)
Dataset<Row> changedDfTemp = changedDf.join(insertedDf, changedDf.col(key)
.equalTo(insertedDf.col(key)),"left_anti"); // This gives only changed record
Long changedCount = changedDfTemp.count();
logger.info("Changed File Count == "+changedCount);
This works well for CSV with columns upto 50 or so.
The Above code fails for one row in CSV with 300+columns, so I am sure this is not file Size problem.
But if I have a CSV having 300+ Columns then it fails with Exception
Max iterations (100) reached for batch Resolution – Spark Error
If I set the below property in Spark, It Works!!!
sparkConf.set("spark.sql.optimizer.maxIterations", "500");
But my question is why do I have to set this?
Is there something wrong which I am doing? Or this behaviour is expected for CSV's which have large columns.
Can I optimize it in any way to handle Large column CSV's.
The issue you are running into is related to how spark takes the instructions you tell it and transforms that into the actual things it's going to do. It first needs to understand your instructions by running Analyzer, then it tries to improve them by running its optimizer. The setting appears to apply to both.
Specifically your code is bombing out during a step in the Analyzer. The analyzer is responsible for figuring out when you refer to things what things you are actually referring to. For example, mapping function names to implementations or mapping column names across renames, and different transforms. It does this in multiple passes resolving additional things each pass, then checking again to see if it can resolve move.
I think what is happening for your case is each pass probably resolves one column, but 100 passes isn't enough to resolve all of the columns. By increasing it you are giving it enough passes to be able to get entirely through your plan. This is definitely a red flag for a potential performance issue, but if your code is working then you can probably just increase the value and not worry about it.
If it isn't working, then you will probably need to try to do something to reduce the number of columns used in your plan. Maybe combining all the columns into one encoded string column as the key. You might benefit from checkpointing the data before doing the join so you can shorten your plan.
EDIT:
Also, I would refactor your above code so you could do it all with only one join. This should be a lot faster, and might solve your other problem.
Each join leads to a shuffle (data being sent between compute nodes) which adds time to your job. Instead of computing adds, deletes and changes independently, you can just do them all at once. Something like the below code. It's in scala psuedo code because I'm more familiar with that than the Java APIs.
import org.apache.spark.sql.functions._
var oldDf = ..
var newDf = ..
val changeCols = newDf.columns.filter(_ != "id").map(col)
// Make the columns you want to compare into a single struct column for easier comparison
newDf = newDF.select($"id", struct(changeCols:_*) as "compare_new")
oldDf = oldDF.select($"id", struct(changeCols:_*) as "compare_old")
// Outer join on ID
val combined = oldDF.join(newDf, Seq("id"), "outer")
// Figure out status of each based upon presence of old/new
// IF old side is missing, must be an ADD
// IF new side is missing, must be a DELETE
// IF both sides present but different, it's a CHANGE
// ELSE it's NOCHANGE
val status = when($"compare_new".isNull, lit("add")).
when($"compare_old".isNull, lit("delete")).
when($"$compare_new" != $"compare_old", lit("change")).
otherwise(lit("nochange"))
val labeled = combined.select($"id", status)
At this point, we have every ID labeled ADD/DELETE/CHANGE/NOCHANGE so we can just a groupBy/count. This agg can be done almost entirely map side so it will be a lot faster than a join.
labeled.groupBy("status").count.show
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With