Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: Converting string to struct

I have data as follows -

{
    "Id": "01d3050e",
    "Properties": "{\"choices\":null,\"object\":\"demo\",\"database\":\"pg\",\"timestamp\":\"1581534117303\"}",
    "LastUpdated": 1581530000000,
    "LastUpdatedBy": "System"
}

Using aws glue, I want to relationalize the "Properties" column but since the datatype is string it can't be done. Converting it to struct, might do it based on reading this blog -

https://aws.amazon.com/blogs/big-data/simplify-querying-nested-json-with-the-aws-glue-relationalize-transform/

>>> df.show
<bound method DataFrame.show of DataFrame[Id: string, LastUpdated: bigint, LastUpdatedBy: string, Properties: string]>
>>> df.show()
+--------+-------------+-------------+--------------------+
|      Id|  LastUpdated|LastUpdatedBy|          Properties|
+--------+-------------+-------------+--------------------+
|01d3050e|1581530000000|       System|{"choices":null,"...|
+--------+-------------+-------------+--------------------+

How can I un-nested the "properties" column to break it into "choices", "object", "database" and "timestamp" columns, using relationalize transformer or any UDF in pyspark.

like image 746
Anand Avatar asked May 14 '26 12:05

Anand


2 Answers

Use from_json since the column Properties is a JSON string.

If the schema is the same for all you records you can convert to a struct type by defining the schema like this:

schema = StructType([StructField("choices", StringType(), True),
                    StructField("object", StringType(), True),
                    StructField("database", StringType(), True),
                    StructField("timestamp", StringType(), True)],
                    )

df.withColumn("Properties", from_json(col("Properties"), schema)).show(truncate=False)

#+--------+-------------+-------------+---------------------------+
#|Id      |LastUpdated  |LastUpdatedBy|Properties                 |
#+--------+-------------+-------------+---------------------------+
#|01d3050e|1581530000000|System       |[, demo, pg, 1581534117303]|
#+--------+-------------+-------------+---------------------------+

However, if the schema can change from one row to another I'd suggest you to convert it to a Map type instead:

df.withColumn("Properties", from_json(col("Properties"), MapType(StringType(), StringType()))).show(truncate=False)

#+--------+-------------+-------------+------------------------------------------------------------------------+
#|Id      |LastUpdated  |LastUpdatedBy|Properties                                                              |
#+--------+-------------+-------------+------------------------------------------------------------------------+
#|01d3050e|1581530000000|System       |[choices ->, object -> demo, database -> pg, timestamp -> 1581534117303]|
#+--------+-------------+-------------+------------------------------------------------------------------------+

You can then access elements of the map using element_at (Spark 2.4+)

like image 70
blackbishop Avatar answered May 17 '26 07:05

blackbishop


Creating your dataframe:

from pyspark.sql import functions as F
list=[["01d3050e","{\"choices\":null,\"object\":\"demo\",\"database\":\"pg\",\"timestamp\":\"1581534117303\"}",1581530000000,"System"]]
df=spark.createDataFrame(list, ['Id','Properties','LastUpdated','LastUpdatedBy'])
df.show(truncate=False)

+--------+----------------------------------------------------------------------------+-------------+-------------+
|Id      |Properties                                                                  |LastUpdated  |LastUpdatedBy|
+--------+----------------------------------------------------------------------------+-------------+-------------+
|01d3050e|{"choices":null,"object":"demo","database":"pg","timestamp":"1581534117303"}|1581530000000|System       |
+--------+----------------------------------------------------------------------------+-------------+-------------+

Use inbuilt regex, split, and element_at:

No need to use UDF, inbuilt functions are adequate and very much optimized for big data tasks.

df.withColumn("Properties", F.split(F.regexp_replace(F.regexp_replace((F.regexp_replace("Properties",'\{|}',"")),'\:',','),'\"|"',"").cast("string"),','))\
.withColumn("choices", F.element_at("Properties",2))\
.withColumn("object", F.element_at("Properties",4))\
.withColumn("database",F.element_at("Properties",6))\
.withColumn("timestamp",F.element_at("Properties",8).cast('long')).drop("Properties").show()


+--------+-------------+-------------+-------+------+--------+-------------+
|      Id|  LastUpdated|LastUpdatedBy|choices|object|database|    timestamp|
+--------+-------------+-------------+-------+------+--------+-------------+
|01d3050e|1581530000000|       System|   null|  demo|      pg|1581534117303|
+--------+-------------+-------------+-------+------+--------+-------------+


root
 |-- Id: string (nullable = true)
 |-- LastUpdated: long (nullable = true)
 |-- LastUpdatedBy: string (nullable = true)
 |-- choices: string (nullable = true)
 |-- object: string (nullable = true)
 |-- database: string (nullable = true)
 |-- timestamp: long (nullable = true)
like image 39
murtihash Avatar answered May 17 '26 06:05

murtihash



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!