I am trying to flatten a JSON file to be able to load it into PostgreSQL all in AWS Glue. I am using PySpark. Using a crawler I crawl the S3 JSON and produce a table. I then use an ETL Glue script to:
Script so far:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")
df0 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")
df1 = df0.select(dfc_root_table_name)
df2 = df1.toDF()
df2 = df1.select(explode(col('`request.data`')).alias("request_data"))
<then i write df1 to a PostgreSQL database which works fine>
Issues I face:
The 'Relationalize' function works well except the request.data field which becomes a bigint and therefore 'explode' doesn't work.
Explode cannot be done without using 'Relationalize' on the JSON first due to the structure of the data. Specifically the error is: "org.apache.spark.sql.AnalysisException: cannot resolve 'explode(request.data)' due to data type mismatch: input to function explode should be array or map type, not bigint"
If I try to make the dynamic frame a dataframe first then I get this issue: "py4j.protocol.Py4JJavaError: An error occurred while calling o72.jdbc. : java.lang.IllegalArgumentException: Can't get JDBC type for struct..."
I tried to also upload a classifier so that the data would flatten in the crawl itself but AWS confirmed this wouldn't work.
The JSON format of the original file is as follows, that I an trying to normalise:
- field1
- field2
- {}
- field3
- {}
- field4
- field5
- []
- {}
- field6
- {}
- field7
- field8
- {}
- field9
- {}
- field10
# Flatten nested df
def flatten_df(nested_df):
for col in nested_df.columns:
array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
for col in array_cols:
nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
if len(nested_cols) == 0:
return nested_df
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
flat_df = nested_df.select(flat_cols +
[F.col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols
for c in nested_df.select(nc+'.*').columns])
return flatten_df(flat_df)
df=flatten_df(df)
It will replace all dots with underscore. Note that it uses explode_outer and not explode to include Null value in case array itself is null. This function is available in spark v2.4+ only.
Also remember, exploding array will add more duplicates and overall row size will increase. Flattening struct will increase column size. In short, your original df will explode horizontally and vertically. It may slow down processing data later.
Therefore my recommendation would be to identify feature related data and store only those data in postgresql and original json files in s3.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With