My current setup is:
I'm using https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md as an example on how to read the data but:
What is the correct way to get each element of the stream and pass it through a python function?
Thanks,
Ed
In the first step you define a dataframe reading the data as a stream from your EventHub or IoT-Hub:
from pyspark.sql.functions import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
The data is stored binary in the body attribute. To get the elements of the body you have to define the structure:
from pyspark.sql.types import *
Schema = StructType([StructField("name", StringType(), True),
StructField("dt", LongType(), True),
StructField("main", StructType(
[StructField("temp", DoubleType()),
StructField("pressure", DoubleType())])),
StructField("coord", StructType(
[StructField("lon", DoubleType()),
StructField("lat", DoubleType())]))
])
and apply the schema on the body casted as a string:
from pyspark.sql.functions import *
rawData = df. \
selectExpr("cast(Body as string) as json"). \
select(from_json("json", Schema).alias("data")). \
select("data.*")
On the resulting dataframe you can apply functions, e. g. call the custom function u_make_hash on the column 'name':
parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With