Does Spark do one or multiple passes through data when multiple withColumn functions are chained?
For example:
val dfnew = df.withColumn("newCol1", f1(col("a")))
.withColumn("newCol2", f2(col("b")))
.withColumn("newCol3", f3(col("c")))
where
df is my input DataFrame containing at least columns a, b, cdfnew is output DataFrame with three new columns newCol1, newCol2, newCol3f1, f2, f3 are some user defined functions or some spark operations on columns like cast, etc In my project I can have even 30 independent withColumn function chained with foldLeft.Important
I am assuming here that f2 does not depend on result of f1, while f3 does not depend on result of f1 and f2. The functions could be performed in any order. There is no shuffle in any function
My observations
withColumn does not increase execution time in such a way to suspect additional passages through data.SQLTransformer with select statement containing all functions vs multiple separate SQLTransformer one for each function and the execution time was similar. Questions
withColumn?f1, f2, f3? UDF vs generic Spark operations?f1, f2, f3 are inside the same stage, does it mean they are in the same data pass?withColumn functions with foldLeft will it change number of passages?SQLTransformers or just one SQLTransformer with all three transformations in the same select_statement. How many passes through data that would do?Spark ingests the CSV file in a distributed way. The file must be on a shared drive, distributed file system, or shared via a shared file system mechanism like Dropbox, Box, Nextcloud/Owncloud, etc. In this context, a partition is a dedicated area in the worker's memory. Dataset<Row> df = spark.
Returns a new DataFrame by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataFrame ; attempting to add a column from some other DataFrame will raise an error. New in version 1.3.
The with Column operation works on selected rows or all of the rows column value. This returns a new Data Frame post performing the operation. It is a transformation function that executes only post-action call over PySpark Data Frame.
Using Spark, you can aggregate any kind of value into a set, list, etc. We will see this in “Aggregating to Complex Types”. We have some categories in aggregations. The simplest grouping is to get a summary of a given data frame by using an aggregation function in a select statement.
Will spark make one or three passages through the data, once for each withColumn?
Spark will "make one passage" through the data. Why? Because spark doesn't actually do anything when this code is reached, it just builds an execution plan which would tell it what to do when dfnew is used (i.e. some action, e.g. count, collect, write etc.) is executed on it. Then, it would be able to compute all functions at once for each record.
Does it depend on the type of functions f1, f2, f3? UDF vs generic Spark operations?
No.
If the functions f1, f2, f3 are inside the same stage, does it mean they are in the same data pass?
Yes.
Does number of passages depend on shuffles within functions? If there is no shuffle?
Almost. First, as long as no caching / checkpointing is used, the number of passages over the data will be the number of actions executed on the resulting newdf DataFrame. Then, each shuffle means each record is read, potentially sent between worker nodes, potentially written to disk, and then read again.
If I chain the withColumn functions with foldLeft will it change number of passages?
No. It will only change the way the above-mentioned plan is constructed, but it will have no effect on how this plan looks (would be the exact same plan), so the computation will remain the same.
I could do something similar with three SQLTransformers or just one SQLTransformer with all three transformations in the same select_statement. How many passes through data that would do?
Again, this won't make any difference, as the execution plan will remain the same.
Basically it doesn't matter, the time of execution will be similar for 1 and 3 passages?
Not sure what this means, but sounds like this is not correct: the time of execution is mostly a factor of number of shuffles and number of actions (assuming same data and same cluster setup).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With