I have a pytest suite. In the pytest suite, I have a majority of tests that do not need additional packages pulled down from maven via "spark.jars.packages", and 2 tests that do.
My thought was for the 2 tests that do, I will:
spark.stop() using pytest tearDown()."spark.jars.packages" and "spark.jars.repositories" specified to download the additional required packages using pytest setUp().This is not working as expected, and the new SparkSession that has "spark.jars.packages" and "spark.jars.repositories" does not download the additional packages.
To my surprise, the pytest suite will only pass if I run the tests that require a SparkSession with "spark.jars.packages" and "spark.jars.repositories" first in the test suite. If I run it second, I get a package not found error, and the required packages were clearly not pulled down from their repo.
My initial assumption was that on the second run, pyspark was referencing the initial SparkSession without "spark.jars.packages" and "spark.jars.repositories". However, what I have confirmed is:
SparkSession is stopped, and the second will be created, a SparkSession does not exist.SparkContext is stopped, and the second will be created, a SparkContext does not exist.SparkSession is created, "spark.jars.packages" and "spark.jars.repositories" are correctly registered in the Spark Configuration.What am I missing here? I dug through the pyspark source code to check why this is the case and could not figure it out. Thanks so much.
Example code showing the problem:
(I am inspecting the error to make the example runnable without the need to really access s3. If everything is setup correctly I expect AccessDeniedException which is fine. The problem I get raises ClassNotFoundException because spark.jars.packages do not get fetched)
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
def spark_session_default():
return SparkSession.builder.getOrCreate()
def spark_session_s3_config():
spark_builder = (
SparkSession.builder
# install dependencies to work with AWS S3 eg. org.apache.hadoop.fs.s3a.S3AFileSystem class
.config("spark.jars.packages", "org.apache.spark:spark-hadoop-cloud_2.12:3.3.0")
)
return spark_builder.getOrCreate()
def try_write_s3_file(spark: SparkSession):
try:
spark.createDataFrame(data=[(1, 2, 3)], schema=["a", "b", "c"]).write.csv("s3a://mybucket/dataset1")
except Py4JJavaError as e:
assert \
e.java_exception.getClass().getName() == "java.nio.file.AccessDeniedException", \
f"Expected AccessDeniedException but got {e.java_exception.getMessage()}"
print("Successfully tried writing to S3! (got Access Denied and not ClassNotFoundException)")
# Comment out 2 lines below to make the example run fine
spark = spark_session_default()
spark.stop()
try_write_s3_file(spark_session_s3_config())
I have found open issue in Spark's JIRA which mentions a solution (quite hacky) https://issues.apache.org/jira/browse/SPARK-38438
To properly close the Spark session you have to also stop the underlying JVM:
s.stop()
s._sc._gateway.shutdown()
s._sc._gateway.proc.stdin.close()
SparkContext._gateway = None
SparkContext._jvm = None
I tested it with the example code and it worked - the spark.jars.packages got installed if the previous session got stopped "properly".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With