Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark : How to write dataframe partition by year/month/day/hour sub-directory?

I have tab delimited data(csv file) like below:

201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...

I want to write directory group by year, month, day, hour.

hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv

How can I write partition by yyyy/mm/dd/hh?

like image 434
Anderson Choi Avatar asked Oct 14 '25 20:10

Anderson Choi


1 Answers

There is already partitionBy in DataFrameWriter which does exactly what you need and it's much simpler. Also, there are functions to extract date parts from timestamp.

Here is another solution you can consider.

As your CSV does not have a header your can apply a custom header when you load it, this way it is easy to manipulate columns later:

custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
    schema.add(StructField(c.strip(), StringType()))

df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)

Now, create the columns year, month, day, hour from the column timestamp as follows:

df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
           .withColumn("year", date_format(col("timestamp"), "yyyy")) \
           .withColumn("month", date_format(col("timestamp"), "MM")) \
           .withColumn("day", date_format(col("timestamp"), "dd")) \
           .withColumn("hour", date_format(col("timestamp"), "HH")) \
           .drop("timestamp")

df_final.show(truncate=False)

+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a    |2019|11   |24 |01  |
|b    |2019|11   |25 |01  |
|c    |2019|11   |25 |01  |
|z    |2019|11   |25 |02  |
|d    |2019|11   |25 |02  |
+-----+----+-----+---+----+

Finally, write DF to destination path using partitionBy like this:

df_final.write.partitionBy("year", "month", "day", "hour") \
    .mode("overwrite") \
    .option("header", "false").option("sep", "\t") \
    .csv("hdfs://dest/")

Partitions will be created under /dest/ folder.

like image 150
blackbishop Avatar answered Oct 17 '25 16:10

blackbishop