Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to overwrite default value of "spark.sql.shuffle.partitions" with Spark Structured Streaming

I want to overwrite the spark.sql.shuffle.partitions parameter directly within the code:

val sparkSession = SparkSession
  .builder()
  .appName("SPARK")
  .getOrCreate()

sparkSession.conf.set("spark.sql.shuffle.partitions", 2)

But this setting does not take effect since in the logs I get the following warning message:

WARN  OffsetSeqMetadata:66 - Updating the value of conf 'spark.sql.shuffle.partitions' in current session from '2' to '200'.

While the same parameter passed in a spark-submit shell works:

#!/bin/bash

/app/spark-2/bin/spark-submit \
--queue root.dev \
--master yarn \
--deploy-mode cluster \
--driver-memory 5G \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 4 \
--conf spark.app.name=SPARK \
--conf spark.executor.memoryOverhead=2048 \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.sql.shuffle.partitions=2 \
--class com.dev.MainClass

Any ideas ?

like image 367
Mamaf Avatar asked Oct 28 '25 10:10

Mamaf


1 Answers

In the checkpoint files of your Spark Structured Streaming job, some of the sparkSession configurations are stored.

For example, in the folder "offset" the content for the latest batch could look like:

v1
{"batchWatermarkMs":0,"batchTimestampMs":1619782960476,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
4

Among others, it stores the value of the configuration spark.sql.shuffle.partitions, which in my example is set to the default value of 200.

In the code you will see, that this configuration value gets replaced in case it is available in the metadata of your checkpoint files.

In case you really have to change the partitions either remove all your checkpoint files or change the value manually to 2 in the last checkpoint files.

like image 67
Michael Heil Avatar answered Oct 30 '25 00:10

Michael Heil