Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka and concurrent Actor execution

Tags:

scala

akka

I have an actor (called Worker) which send a same message to 3 other Actors (called Filter1, Filter2, Filter3)

Each of this filters has a random time to resolve this action. Then, in the Worker actor, I use the ask pattern and wait the future success:

class Worker2 extends Actor with ActorLogging {

  val filter1 = context.actorOf(Props[Filter1], "filter1")
  val filter2 = context.actorOf(Props[Filter2], "filter2")
  val filter3 = context.actorOf(Props[Filter3], "filter3")

  implicit val timeout = Timeout(100.seconds)

  def receive = {
    case Work(t) =>

      val futureF3 = (filter3 ? Work(false)).mapTo[Response]
      val futureF2 = (filter2 ? Work(true)).mapTo[Response]
      val futureF1 = (filter1 ? Work(true)).mapTo[Response]

      val aggResult: Future[Boolean] =
        for {
          f3 <- futureF3
          f2 <- futureF2
          f1 <- futureF1
        } yield f1.reponse && f2.reponse && f3.reponse

      if (Await.result(aggResult, timeout.duration)) {
        log.info("Response: true")
        sender ! Response(true)
      } else {
        log.info("Response: false")
        sender ! Response(false)
      }
  }
}

If any of the Filter actors return false, then I don't need the other answers. For example, If I run in parallel the 3 Filter Actors, if in one case, Filter1 response false, the Work is solved and I don't need the answers of Filter2 and Filter3.

In this code, I always need to wait for the 3 executions to decide, that seems unnecessary. Is there a way to set up a short-circuit?

like image 715
German Avatar asked Nov 18 '25 13:11

German


1 Answers

A solution to this problem is to use Future.find() -- Scaladoc Here

You could solve it like this:

val failed = Future.find([f1,f2,f3]) { res => !res }
Await.result(failed, timeout.duration) match {
    None => // Success
    _ => // Failed
}

Future.find() will return the first future that completes and matches the predicate. If all futures have completed and none of the results match the predicate then it returns a None.

Edit:

A better solution would be to prevent blocking all together and use the akka pipe functionality to pipe the result directly to the sender when a response is found. This way your not blocking a thread using this actor:

import akka.pattern.pipe

val failed = Future.find([f1,f2,f3]) { res => !res }
val senderRef = sender
failed.map(res => Response(res.getOrElse(true))).pipeTo(senderRef)

In the getOrElse(true) part the result is false if we found a future just like before otherwise we return true.

like image 111
Bryan Avatar answered Nov 21 '25 04:11

Bryan