I'm trying to parallelize a machine learning prediction task via Spark. I've used Spark successfully a number of times before on other tasks and have faced no issues with parallelization before.
In this particular task, my cluster has 4 workers. I'm calling mapPartitions on an RDD with 4 partitions. The map function loads a model from disk (a bootstrap script distributes all that is needed to do this; I've verified it exists on each slave machine) and performs prediction on data points in the RDD partition.
The code runs, but only utilizes one executor. The logs for the other executors say "Shutdown hook called". On different runs of the code, it uses different machines, but only one at a time.
How can I get Spark to use multiple machines at once?
I'm using PySpark on Amazon EMR via Zeppelin notebook. Code snippets are below.
%spark.pyspark
sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")
from ModelLoader import ModelLoader
from MyClassifier import MyClassifier
def load_models():
    models_path = '/home/hadoop/models'
    model_loader = ModelLoader(models_path)
    models = model_loader.load_models()
    return models
def process_file(file_contents, models):
    filename = file_contents[0]
    filetext = file_contents[1]
    pred = MyClassifier.predict(filetext, models)
    return (filename, pred)
def process_partition(file_list):
    models = load_models()
    for file_contents in file_list:
        pred = process_file(file_contents, models)
        yield pred
all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')
There are four tasks as expected, but they all run on the same executor!
I have the cluster running and can provide logs as available in Resource Manager. I just don't know yet where to look.
In addition to running the spark on the YARN or Mesos cluster managers, Spark also provides a simple standalone deploy mode. You can set up and launch a standalone cluster or set up on a single machine for the personal development or testing purpose.
Yes, A worker node can be holding multiple executors (processes) if it has sufficient CPU, Memory and Storage.
The memory components of a Spark cluster worker node are Memory for HDFS, YARN and other daemons, and executors for Spark applications. Each cluster worker node contains executors. An executor is a process that is launched for a Spark application on a worker node.
What is true of the Spark Interactive Shell? It initializes SparkContext and makes it available. Provides instant feedback as code is entered, and allows you to write programs interactively.
Two points to mention here (not sure if they will solve your issue though):
wholeTextFiles uses WholeTextFileInputFormat which extends CombineFileInputFormat, and because of CombineFileInputFormat, it will try to combine groups of small files into one partition. So if you set the number of partition to 2 for example, you 'might' get two partitions but it is not guaranteed, it depends on the size of the files you are reading.wholeTextFiles is an RDD that contains an entire file in each record (and each record/file cannot be split so it will end by being in a single partition/worker). So if you are reading one file only, you will end by having the full file in one partition despite that you set the partitioning to 4 in your example.The process has as many as partitions you specified but it is going in serialized way.
Executors
The process might spin up default number of executors. This can be seen in the yarn resource manager. In your case all the processing is done by one executor. If executor has more than one core it will parellize the job. In emr you have do this changes in order to have more than 1 core for the executor.
What specifically happening in our case is, the data is small, so all the data is read in one executor(ie which is using one node). With out the following property the executor uses only single core. Hence all the tasks are serialized.
Setting the property
sudo  vi /etc/hadoop/conf/capacity-scheduler.xml
Setting the following property as shown
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
In order to make this property applicable you have to restart the yarn
 sudo  hadoop-yarn-resourcemanager stop
Restart the yarn
 sudo  hadoop-yarn-resourcemanager start 
When your job is submitted see the yarn and the spark-ui
In Yarn you will see more cores for executor
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With