I have tab delimited data(csv file) like below:
201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...
I want to write directory group by year, month, day, hour.
hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv
How can I write partition by yyyy/mm/dd/hh?
There is already partitionBy
in DataFrameWriter which does exactly what you need and it's much simpler. Also, there are functions to extract date parts from timestamp.
Here is another solution you can consider.
As your CSV does not have a header your can apply a custom header when you load it, this way it is easy to manipulate columns later:
custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
schema.add(StructField(c.strip(), StringType()))
df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)
Now, create the columns year
, month
, day
, hour
from the column timestamp
as follows:
df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
.withColumn("year", date_format(col("timestamp"), "yyyy")) \
.withColumn("month", date_format(col("timestamp"), "MM")) \
.withColumn("day", date_format(col("timestamp"), "dd")) \
.withColumn("hour", date_format(col("timestamp"), "HH")) \
.drop("timestamp")
df_final.show(truncate=False)
+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a |2019|11 |24 |01 |
|b |2019|11 |25 |01 |
|c |2019|11 |25 |01 |
|z |2019|11 |25 |02 |
|d |2019|11 |25 |02 |
+-----+----+-----+---+----+
Finally, write DF to destination path using partitionBy
like this:
df_final.write.partitionBy("year", "month", "day", "hour") \
.mode("overwrite") \
.option("header", "false").option("sep", "\t") \
.csv("hdfs://dest/")
Partitions will be created under /dest/
folder.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With