I have a data set like this:
name time val
---- ----- ---
fred 04:00 111
greg 03:00 123
fred 01:00 411
fred 05:00 921
fred 11:00 157
greg 12:00 333
And csv files in some folder, one for each unique name from the data set:
fred.csv
greg.csv
The contents of fred.csv, for example, looks like this:
00:00 222
10:00 133
My goal is to efficiently merge the dataset to the CSV's in sorted time order so that fred.csv, for example, ends up like this:
00:00 222
01:00 411
04:00 111
05:00 921
10:00 133
In reality, there are thousands of unique names, not just two. I use union and sort functions to add rows in order, but I have not been successful with partitionBy, for each, or coalesce in getting the rows to their proper CSV files.
Import and declare necessary variables
val spark = SparkSession.builder
.master("local")
.appName("Partition Sort Demo")
.getOrCreate;
import spark.implicits._
Create dataframe from source file
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.csv("csv/file/location")
//df.show()
+----+-----+---+
|name| time|val|
+----+-----+---+
|fred|04:00|111|
|greg|03:00|123|
|fred|01:00|411|
|fred|05:00|921|
|fred|11:00|157|
|greg|12:00|333|
+----+-----+---+
Now
repartitiondataframe by name andsorteach partition thensavethem//repartition val repartitionedDf = df.repartition($"name") for { //fetch the distinct names in dataframe use as filename distinctName <- df.dropDuplicates("name").collect.map(_ (0)) } yield { import org.apache.spark.sql.functions.lit repartitionedDf.select("time", "val") .filter($"name" === lit(distinctName)) //filter df by name .coalesce(1) .sortWithinPartitions($"time") //sort .write.mode("overwrite").csv("location/" + distinctName + ".csv") //save }
The content of CSV file is available in highlighted files.

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With