Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkSession does not pull down packages from repo in pytest suite

I have a pytest suite. In the pytest suite, I have a majority of tests that do not need additional packages pulled down from maven via "spark.jars.packages", and 2 tests that do.

My thought was for the 2 tests that do, I will:

  1. Stop the existing SparkSession used for the tests that do not require additional packages with spark.stop() using pytest tearDown().
  2. Create an new SparkSession that has "spark.jars.packages" and "spark.jars.repositories" specified to download the additional required packages using pytest setUp().

This is not working as expected, and the new SparkSession that has "spark.jars.packages" and "spark.jars.repositories" does not download the additional packages.

To my surprise, the pytest suite will only pass if I run the tests that require a SparkSession with "spark.jars.packages" and "spark.jars.repositories" first in the test suite. If I run it second, I get a package not found error, and the required packages were clearly not pulled down from their repo.

My initial assumption was that on the second run, pyspark was referencing the initial SparkSession without "spark.jars.packages" and "spark.jars.repositories". However, what I have confirmed is:

  1. At the point where the first SparkSession is stopped, and the second will be created, a SparkSession does not exist.
  2. At the point where the first SparkContext is stopped, and the second will be created, a SparkContext does not exist.
  3. When the second SparkSession is created, "spark.jars.packages" and "spark.jars.repositories" are correctly registered in the Spark Configuration.

What am I missing here? I dug through the pyspark source code to check why this is the case and could not figure it out. Thanks so much.

Example code showing the problem:

(I am inspecting the error to make the example runnable without the need to really access s3. If everything is setup correctly I expect AccessDeniedException which is fine. The problem I get raises ClassNotFoundException because spark.jars.packages do not get fetched)

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession


def spark_session_default():
    return SparkSession.builder.getOrCreate()


def spark_session_s3_config():
    spark_builder = (
        SparkSession.builder
        # install dependencies to work with AWS S3 eg. org.apache.hadoop.fs.s3a.S3AFileSystem class
        .config("spark.jars.packages", "org.apache.spark:spark-hadoop-cloud_2.12:3.3.0")
    )
    return spark_builder.getOrCreate()


def try_write_s3_file(spark: SparkSession):
    try:
        spark.createDataFrame(data=[(1, 2, 3)], schema=["a", "b", "c"]).write.csv("s3a://mybucket/dataset1")
    except Py4JJavaError as e:
        assert \
            e.java_exception.getClass().getName() == "java.nio.file.AccessDeniedException", \
            f"Expected AccessDeniedException but got {e.java_exception.getMessage()}"
        print("Successfully tried writing to S3! (got Access Denied and not ClassNotFoundException)")


# Comment out 2 lines below to make the example run fine
spark = spark_session_default()  
spark.stop() 
try_write_s3_file(spark_session_s3_config())
like image 286
mondal.alex Avatar asked Oct 29 '25 04:10

mondal.alex


1 Answers

I have found open issue in Spark's JIRA which mentions a solution (quite hacky) https://issues.apache.org/jira/browse/SPARK-38438

To properly close the Spark session you have to also stop the underlying JVM:

s.stop()
s._sc._gateway.shutdown()
s._sc._gateway.proc.stdin.close()
SparkContext._gateway = None
SparkContext._jvm = None

I tested it with the example code and it worked - the spark.jars.packages got installed if the previous session got stopped "properly".

like image 156
botchniaque Avatar answered Nov 01 '25 10:11

botchniaque