I am streaming meter reading records as JSON from kafka_2.11-0.10.0.1 into Spark 2.1. I switched to structured streaming; and although kafka consumer confirms incoming data, I the console and writeStream dont move. I am testing using
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
My code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("interval") \
.master("local[4]") \
.getOrCreate()
schema = StructType().add("customer_id", StringType())
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xx.xxx.xx.xxx:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
query = df.writeStream \
.option("checkpointLocation", "/user/XX/checkpoint5") \
.format("parquet") \
.start("/user/XX/interval5")
It creates the checkpoint & data directories with a 388 byte parquet file. However no streamed data is ever written.
$ hdfs dfs -ls interval5
drwxr-xr-x ... interval5/_spark_metadata
-rw-r--r-- ... interval5/part-00000-0b2eb00a-c361-4dfe-a24e-9589d150a911.snappy.parquet
-rw-r--r-- ... interval5/part-00000-e0cb12d1-9c29-4eb0-92a8-688f468a42ce.snappy.parquet
kafka-consumer confirms data is being shipped:
{"customer_id":"customer_736"}
{"customer_id":"customer_995"}
{"customer_id":"customer_1899"}
{"customer_id":"customer_35"}
kafka-consumer displays the streamed data.
I think I'm missing an essential step to dequeue and save the streamed rows - a day of trawling stackoverflow has not helped. (edited to remove the references to the console; as it is not relevant).
With .option("startingOffsets", "latest")
you should only expect messages that were published after you've started the streaming query.
So, the expected course of action is to start the streaming query and then publish messages.
Nothing is written into the parquet files.
You will see nothing saved into parquet files since you used .format("console")
. You have to change it to parquet
and restart the query.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With