In my spark application, there is an object ResourceFactory which contains an akka ActorSystem for providing resource clients. So when I run this spark application, every worker node will create an ActorSystem. The problem is that when the spark application finishes its works and gets shutdown. The ActorSystem still keeps alive on every worker node and prevents the whole application to terminate, it's just hung on.
Is there a way to register some listener to the SparkContext so that when the sc gets shutdown, then the ActorSystem on every worker node will get notified to shutdown themselves?
UPDATE:
Following is the simplified skeleton:
There is a ResourceFactory, which is an object and it contains an actor system. And it also provides a fetchData method.
object ResourceFactory{
  val actorSystem = ActorSystem("resource-akka-system")
  def fetchData(): SomeData = ...
}
And then, there is a user-defined RDD class, in its compute method, it needs to fetch data from the ResourceFactory.
class MyRDD extends RDD[SomeClass] {
  override def compute(...) {
    ...
    ResourceFactory.fetchData()
    ...
    someIterator
  }
}
So on every node there will be one ActorSystem named "resource-akka-system", and those MyRDD instances distributed on those worker nodes can get data from the "resource-akka-system".
The problem is that, when the SparkContext gets shutdown, there is no need for those "resource-akka-system"s, but I don't know how to notify the ResourceFactory to shutdown the "resource-akka-system" when the SparkContext gets shutdown. So now, the "resouce-akka-system" keeps alive on each worker node and prevents the whole program to exit.
UPDATE2:
With some more experiments, I find that in local mode the program is hung on, but in yarn-cluster mode, the program will exit successfully. May be this is because yarn will kill the threads on worker nodes when the sc is shutdown?
UPDATE3:
To check whether every node contains an ActorSystem, I change the code as following(following is the real skeleton, as I add another class definition):
object ResourceFactory{
  println("creating resource factory")
  val actorSystem = ActorSystem("resource-akka-system")
  def fetchData(): SomeData = ...
}
class MyRDD extends RDD[SomeClass] {
  println("creating my rdd")
  override def compute(...) {
    new RDDIterator(...)
  }
}
class RDDIterator(...) extends Iterator[SomeClass] {
  println("creating rdd iterator")
  ...
  lazy val reader = {
    ...
    ResourceFactory.fetchData()
    ...
  }
  ...
  override next() = {
    ...
    reader.xx()
  }
}
After adding those printlns, I run the code on spark on yarn-cluster mode. I find that on the driver I have following prints:
creating my rdd
creating resource factory
creating my rdd
...
While on some of the workers, I have following prints:
creating rdd iterator
creating resource factory
And some of the workers, it prints nothing (and all of them are not assigned any tasks).
Based on the above, I think the object is initialized in driver eagerly, since it prints creating resource factory on the driver even when no thing refers to it, and object is initialized in worker lazily because it prints creating resource factory after printing creating rdd iterator as resource factory is lazily referenced by the first created RDDIterator.
And I find that in my use case the MyRDD class is only created in the driver.
I am not very sure about the laziness of the initialization of the object on driver and worker, it's my guess, because maybe it's caused by other part of the program to make it looks like that. But I think it should be right that there is one actor system on each worker node when it is necessary.
To avoid full GC in G1 GC, there are two commonly-used approaches: Decrease the InitiatingHeapOccupancyPercent option's value (the default value is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we are more likely to avoid full GC.
You can background the spark-submit process like any other linux process, by putting it into the background in the shell. In your case, the spark-submit job actually then runs the driver on YARN, so, it's baby-sitting a process that's already running asynchronously on another machine via YARN.
If you're sure that you're running standalone, then using System. exit() is fine, although it's frowned upon; however, Spark applications, despite looking like standalone apps, are actually running as part of the Spark runtime on the cluster, and you might terminate more than you intended to.
In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
I don't think that there is a way to tap into each Worker lifecycle.
Also I have some questions regarding your implementation:
If you have object that contains val, that is used from function run on worker, my understanding is that this val gets serialized and broadcasted to worker. Can you confirm, that you have one ActorSystem running per worker?
Actor System usually terminated immediately if you don't explicitly wait for it's termination. Are you calling something like system.awaitTermination or blocking on system.whenTerminated?
Anyway, there is another way, how you can shutdown actor systems on remote workers:
sc is) broadcasted to each worker. In simple words, just have val with that address.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With