Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I get metadata of files reading by Spark

Let's suppose we have 2 files, file#1 created at 12:55 and file#2 created at 12:58. While reading these two files I want to add a new column "creation_time". Rows belong to file#1 have 12:55 in "creation_time" column and Rows belong to file#2 have 12:58 in "creation_time".

new_data = spark.read.option("header", "true").csv("s3://bucket7838-1/input")

I'm using above code snippet to read the files in "input" directory.

like image 629
Abdul Haseeb Avatar asked Sep 12 '25 11:09

Abdul Haseeb


1 Answers

Use input_file_name() function to get the filename and then use hdfs file api to get the file timestamp finally join both dataframes on filename.

Example:

from pyspark.sql.types import *
from pyspark.sql.functions import *
URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

fs = FileSystem.get(URI("hdfs://<namenode_address>:8020"), Configuration())

status = fs.listStatus(Path('<hdfs_directory>'))

filestatus_df=spark.createDataFrame([[str(i.getPath()),i.getModificationTime()/1000] for i in status],["filename","modified_time"]).\
withColumn("modified_time",to_timestamp(col("modified_time")))

input_df=spark.read.csv("<hdfs_directory>").\
withColumn("filename",input_file_name())

#join both dataframes on filename to get filetimestamp
df=input_df.join(filestatus_df,['filename'],"left")
like image 171
notNull Avatar answered Sep 15 '25 00:09

notNull