Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark structured streaming: not writing correctly

I am streaming meter reading records as JSON from kafka_2.11-0.10.0.1 into Spark 2.1. I switched to structured streaming; and although kafka consumer confirms incoming data, I the console and writeStream dont move. I am testing using

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

My code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("interval") \
    .master("local[4]") \
    .getOrCreate()
schema = StructType().add("customer_id", StringType()) 
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "xx.xxx.xx.xxx:9092") \
      .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

query = df.writeStream \
 .option("checkpointLocation", "/user/XX/checkpoint5") \
 .format("parquet") \
 .start("/user/XX/interval5") 

It creates the checkpoint & data directories with a 388 byte parquet file. However no streamed data is ever written.

$ hdfs dfs -ls interval5
drwxr-xr-x   ... interval5/_spark_metadata
-rw-r--r--   ... interval5/part-00000-0b2eb00a-c361-4dfe-a24e-9589d150a911.snappy.parquet
-rw-r--r--   ... interval5/part-00000-e0cb12d1-9c29-4eb0-92a8-688f468a42ce.snappy.parquet

kafka-consumer confirms data is being shipped:

{"customer_id":"customer_736"}
{"customer_id":"customer_995"}
{"customer_id":"customer_1899"}
{"customer_id":"customer_35"}

kafka-consumer displays the streamed data.

I think I'm missing an essential step to dequeue and save the streamed rows - a day of trawling stackoverflow has not helped. (edited to remove the references to the console; as it is not relevant).

like image 287
MarkTeehan Avatar asked Sep 05 '25 09:09

MarkTeehan


1 Answers

With .option("startingOffsets", "latest") you should only expect messages that were published after you've started the streaming query.

So, the expected course of action is to start the streaming query and then publish messages.

Nothing is written into the parquet files.

You will see nothing saved into parquet files since you used .format("console"). You have to change it to parquet and restart the query.

like image 67
Jacek Laskowski Avatar answered Sep 08 '25 12:09

Jacek Laskowski