Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark memory cache keeps increasing even with unpersist

I am iterating through 3 large files and performing a bunch of statistical calculations.

I have 55GB of usable memory per executor, 8V cores, and up to 10 TASK nodes available aside from 1 CORE and 1 MASTER nodes.

The following is the pseudocode of my actual code:

    #Load MyConfigMeta file- this is a small file and will be a couple of times in the code
MyConfigMeta=spark.read.parquet("s3://path/MyConfigMeta.parquet")
MyConfigMeta=MyConfigMeta.persist(StorageLevel.MEMORY_AND_DISK)

#Very Large timeseries files
modules=["s3://path/file1.parquet",
         "s3://path/file2.parquet",
         "s3://path/file3.parquet"]

for file in modules:
    out_filename=1
    df1=spark.read.parquet(file)
    df1=df1.join(MyConfigMeta, on=["key"], how="inner")
    
    #Find out latest column values based on Timestamp
    lim_max=df1.groupBy('key')\
    .agg(f.max('TIME_STAMP').alias('TIME_STAMP'))
    temp=df1.select('TIME_STAMP','key',''UL','LL')
    lim_max=lim_max.join(temp, on=['TIME_STAMP','key'], how="left")\
    .drop('TIME_STAMP')\
    .distinct()
    lim_max=lim_max.persist(StorageLevel.MEMORY_AND_DISK)
    
    df1=df1.drop('UL,'LL')\
    .join(lim_max, on=['key'], how="left")\
    withColumn('out_clip', when(col('RESULT').between(col('LL'),col('UL')), 0).otherwise(1))\
    
    df1=df1.persist(StorageLevel.MEMORY_AND_DISK) # This is a very large dataframe and will later be used for simulation
    
    df2=df1.filter(col('out_clip')==0)\
    .groupBy('key')\
    .agg(f.round(expr('percentile(RESULT, 0.9999)'),4).alias('UPPER_PERCENTILE'),
         f.round(expr('percentile(RESULT, 0.0001)'),4).alias('LOWER_PERCENTILE'))\
    .withColumn('pcnt_clip', when(col('RESULT').between(col('LOWER_PERCENTILE'),col('UPPER_PERCENTILE')), 0).otherwise(1))\
    .filter(col('pcnt_clip')==0)
    
    stats=df2.groupBy('key')\
    .agg(#Perform a bunch of statistical calculations (mean, avg, kurtosis, skew))
    stats=stats.join(lim_max, on=['key'], how="left") #get back the columns from lim_max
    
    lim_max=lim_max.unpersist()
    
    stats=stats.withColumn('New_UL', #formula to calculate new limits)\
    .withColumn('New_LL', #formula to calculate new limits)\
    .join(MyConfigMeta, on=['key'], how="left")
    
    #Simulate data
    df_sim=df1.join(stats, on=['key'], how="inner")\
    .withColumn('newOOC', when ((col('RESULT')<col('New_LL')) | (col('RESULT')>col('New_UL')), 1).otherwise(0))
    
    df3=df_sim.groupBy('key')\
    .agg(f.sum('newOOC').alias('simulated result'))
    
    #Join back with stats to get statistcal data, context data along with simulated data
    df4=df3.join(stats, on=['key'], how="inner")
    
    #Write output file
    df4.write.mode('overwrite').parquet("s3://path/sim_" +out_filename+ ".parquet")
    
    df1=df1.unpersist()
    spark.catalog.clearCache()

My spark-submit configuration is 6 executor-cores and driver-cores, 41GB executor-memory, 41GB driver-memory, 14GB spark.executor.memoryOverhead and 9 num-executors`.

When I look at the memory chart in Ganglia, I noticed that the first file completes fine, but the computation for the subsequent files fail because it keeps running into lost node issues

ExecutorLostFailure (executor 5 exited unrelated to the running tasks) Reason: Container marked as failed. Diagnostics: Container released on a lost node.

enter image description here

I would have expected the cache memory to clear significantly since I unpersisted df1 dataframe and used spark.catalog.clearCache(). But the memory continuously seems to increase without being cleared. However if i run the files individually it seems to work fine.

enter image description here

Here, a good chunk of memory got cleared only because 10 executors were dead and got blacklisted.

Is there a way to force memory flush in spark? Or is there another reason why I keep losing nodes?

like image 588
thentangler Avatar asked Oct 28 '25 17:10

thentangler


1 Answers

You can flush all persisted datasets in the SparkContext by using the following function. It lists the RDDs and invoke the unpersist method. It is particularly useful when DF are created inside functions.

def unpersist_dataframes() -> None:
  for (id, rdd) in sc._jsc.getPersistentRDDs().items():
      rdd.unpersist()
      print("Unpersisted {} rdd".format(id))

In order to monitor the persisted dataframes, check the Storage tab from the SparkUI instead. Do not worry about the free memory in Ganglia stats, actually it could be a sign that your resources are not fully utilised. Spark manages the memory wisely.

Regarding the lost nodes, if you are using a managed service like Databricks, it will show the reason behind the termination of the nodes in the cluster's event log.

like image 144
Emer Avatar answered Oct 31 '25 11:10

Emer



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!