Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How many executor processes run for each worker node in spark?

How many executors will be launched for each worker node in Spark? Can i know the math behind it?

for example i have 6 worker nodes and 1 master and if i submit a job through spark-submit, how many maximum number of executors will be launched for job?

like image 581
AKC Avatar asked Oct 25 '25 14:10

AKC


1 Answers

to piggyback off of @LiMuBei's answer...

First, it is whatever you tell it to be

--num-executors 4

If using Dynamic Allocation, then this is how it decides for you

According to this document (http://jerryshao.me/architecture/2015/08/22/spark-dynamic-allocation-investigation/),

How Spark calculates the maximum number of executors it requires through pending and running tasks:

 private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
 }

If current executor number is more than the expected number:

 // The target number exceeds the number we actually need, so stop adding new
 // executors and inform the cluster manager to cancel the extra pending requests
 val oldNumExecutorsTarget = numExecutorsTarget
 numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
 numExecutorsToAdd = 1

 // If the new target has not changed, avoid sending a message to the cluster manager
 if (numExecutorsTarget < oldNumExecutorsTarget) {
   client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
   logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
     s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
 }
 numExecutorsTarget - oldNumExecutorsTarget

If the current executor number is more than the desired number, Spark will notify the cluster manager to cancel pending requests, since they are unneeded. For those already allocated executors, they will be ramped down to a reasonable number later through timeout mechanism.

If current executor number cannot satisfy the desired number:

 val oldNumExecutorsTarget = numExecutorsTarget

 // There's no point in wasting time ramping up to the number of executors we already have, so
 // make sure our target is at least as much as our current allocation:
 numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)

 // Boost our target with the number to add for this round:
 numExecutorsTarget += numExecutorsToAdd

 // Ensure that our target doesn't exceed what we need at the present moment:
 numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)

 // Ensure that our target fits within configured bounds:
 numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
 val delta = numExecutorsTarget - oldNumExecutorsTarget

 // If our target has not changed, do not send a message
 // to the cluster manager and reset our exponential growth
 if (delta == 0) {
   numExecutorsToAdd = 1
   return 0
 }
 val addRequestAcknowledged = testing ||
   client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
 if (addRequestAcknowledged) {
   val executorsString = "executor" + { if (delta > 1) "s" else "" }
   logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
     s" (new desired total will be $numExecutorsTarget)")
   numExecutorsToAdd = if (delta == numExecutorsToAdd) {
     numExecutorsToAdd * 2
   } else {
     1
   }
   delta
 } else {
   logWarning(
     s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
   0
 }
like image 189
Kristian Avatar answered Oct 28 '25 17:10

Kristian