Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka.net - How to wait child actor to process all pending messages prior to stop

Tags:

c#

akka

akka.net

We have a clustered sharded actor named A and it has multiple child actors created with the child per entity pattern as shown below. When we Tell 100 messages from actor B to D and actor D takes say, 500 ms to process each message, at the same time, when we send the poison pill to actor A using Context.Parent.Tell (new Passivate (PoisonPill.Instance )); It immediately stops all child actors, including actor D, without processing pending messages.

    A
    |
    B    
   / \
  C   D

Is there a way to wait for actor D to process all the messages?

like image 223
csharpdev Avatar asked Sep 19 '25 23:09

csharpdev


2 Answers

https://stackoverflow.com/a/70286526/377476 is a good start; you will need a custom shutdown message. When a parent actor terminates, it's children are automatically killed via /system messages which supersede any unprocessed /user messages in their queue.

So what you need to do is ensure that all of their /user messages are processed prior to the parent terminating itself. There's a straightforward way to do this using the GracefulStop extension method in combination with your custom stop message:

public sealed class ActorA : ReceiveActor{
    private IActorRef _actorB;  
    
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public ActorA(){
        Receive<StartWork>(w => {
            foreach(var i in Enumerable.Range(0, w.WorkCount)){
                _actorB.Tell(i);
            }
        });
        
        ReceiveAsync<MyStopMessage>(async _ => {
            _log.Info("Begin shutdown");
            
            // stop child actor B with the same custom message
            await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);
            
            // shut ourselves down after child is done
            Context.Stop(Self);
        });
    }
    
    protected override void PreStart(){
        _actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
    }
}

public sealed class ActorB : ReceiveActor{
    private IActorRef _actorC;
    private IActorRef _actorD;
    
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public ActorB(){
        Receive<int>(i => {
            _actorC.Tell(i);
            _actorD.Tell(i);
        });
        
        ReceiveAsync<MyStopMessage>(async _ => {
            
            _log.Info("Begin shutdown");
            
            // stop both actors in parallel
            var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
            var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));
            
            // compose stop Tasks
            var bothStopped = Task.WhenAll(stopC, stopD);
            await bothStopped;
            
            // shut ourselves down immediately
            Context.Stop(Self);
        });
    }
    
    protected override void PreStart(){
        var workerProps = Props.Create(() => new WorkerActor());
        _actorC = Context.ActorOf(workerProps, "c");
        _actorD = Context.ActorOf(workerProps, "d");
    }
}

public sealed class WorkerActor : ReceiveActor {
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public WorkerActor(){
        ReceiveAsync<int>(async i => {
            await Task.Delay(10);
            _log.Info("Received {0}", i);
        });
    }
}

I've created a runnable version of this sample here: https://dotnetfiddle.net/xiGyWM - you'll see that the MyStopMessages are received not long after the sample starts, but after C and D have been given work. All of that work completes before any actors terminate in this scenario.

like image 109
Aaronontheweb Avatar answered Sep 22 '25 14:09

Aaronontheweb


Instead of sending a PoisonPill - which is a system message and therefore is handled with a higher priority than traditional messages - you may define your own stop message, and let an actor handle it using Context.Stop(Self).

class MyShardedActor : ReceiveActor {
    public MyShardedActor() {
        Receive<MyStopMessage>(_ => Context.Stop(Self));
    }
}

You can register your custom message to be used with passivate calls triggered by the cluster on its own using ClusterSharding.Start method overload, which takes a handOffMessage parameter, that will be send within Passivate request instead of PoisonPill.

like image 29
Bartosz Sypytkowski Avatar answered Sep 22 '25 14:09

Bartosz Sypytkowski