Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

scala futures - how to get result or failure from both futures

Tags:

scala

future

I'm using for to run 2 futures in parallel. I want to know which succeeeded and which failed in all cases (all should run til completion with either a result or a failure status). Currently I am only able to retrieve a combined success result

I took insperation from here but it doesn't suffice since I don't get the success statuses when a single fails nor the failures of both in cases of both failing failure in Scala future's for comprehension

case class TaggedException(context:String, val throwable: Throwable) extends Exception(throwable.getMessage)

val f1 = Future {...}.recoverWith {case e:Throwable => Future.Failed(new TaggedException("first one failed", e))}
val f2 = Future {...}.recoverWith {case e: Throwable => Future.Failed(new TaggedException("second one failed", e))}

val combinedResult = for {
  r1 <- f1
  r2 <- f2
} yield (r1,r2)

combinedResult.onFailure {
case e : TaggedException => ... // if both fail I only get the first line in the for
// in case where single fails I only know fail status without the success of the second
}

I'm trying to avoid this mess:

var countCompleted = 0 ... or some other atomic way to count 
f1 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

f2 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

Edit : Another version - Is this a valid approach?

def toFutureTry[A](future: Future[A]):Future[Try[A]] = future.map(Success(_)).recover {case t: Throwable => Failure(t)}

    val fa: Future[Try[Blah]] = toFutureTry(f1)
    val fb: Future[Try[Foo]] = toFutureTry(f2)

    val combinedRes = for {
      ra <- fa
      rb <- fb
    } yield (ra,rb)

    combinedRes.onComplete {
      case Success(successRes: (Try[Blah], Try[Foo])) => // all of these cases are success or fails
      case Failure(f: Throwable) => // i think it is unused right?
    }
like image 204
Avner Barr Avatar asked Dec 06 '25 16:12

Avner Barr


2 Answers

You can combine transform and zip like this:

val combinedResult: Future[(Try[T], Try[T])] =
  f1.transform(Success(_)).zip(f2.transform(Success(_)))

Then you can do:

combinedResult map {
  case (Success(v1), Success(v2)) =>
  case (Success(v1), Failure(f2)) =>
  case (Failure(f1), Success(v2)) =>
  case (Failure(f1), Failure(f2)) =>
}
like image 86
Viktor Klang Avatar answered Dec 08 '25 11:12

Viktor Klang


Using flatMap on Future[A] won't help, as it will always short circuit on the first failure produced by one of them, where you really want to accumulate errors.

A solution using Future.traverse which will work on arbitrary many Future[A] instances:

val f1 = Future.failed[Int](new Exception("42")).recoverWith {
  case e: Throwable => Future.failed(TaggedException("first one failed", e))
}

val f2 = Future(42).recoverWith {
  case e: Throwable =>
    Future.failed(TaggedException("second one failed", e))
}

val res: Future[List[Either[Throwable, Int]]] = 
  Future
   .traverse(List(f1, f2)) {
      eventualInt => eventualInt
       .map(i => Right(i))
       .recover { case err => Left(err) }
   }

res.onComplete {
  case Failure(exception) =>
    println(exception)
  case Success(value) =>
    value.foreach {
      case Right(int) => println(s"Received num: $int")
      case Left(err) => println(s"Oh no, err: $err")
    }
}

Await.result(res, Duration.Inf)

We can also use a little help from cats with it's Validated type:

import cats.data.Validated.{Invalid, Valid}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import cats.implicits._
import scala.util.{Failure, Success}

def main(args: Array[String]): Unit = {
  case class TaggedException(context: String, throwable: Throwable)
    extends Exception(throwable.getMessage)

  val f1 = Future.failed[Int](new Exception("42")).recoverWith {
    case e: Throwable => Future.failed(TaggedException("first one failed", e))
  }

  val f2 = Future(42).recoverWith {
    case e: Throwable => Future.failed(TaggedException("second one failed", e))
  }

  val res: Future[List[Validated[Throwable, Int]]] = 
    List(f1, f2)
     .traverse(eventualInt => eventualInt
                       .map(i => Valid(i))
                       .recover { case err => Invalid(err) })

  res.onComplete {
    case Failure(exception) =>
      println(exception)
    case Success(value) =>
      value.foreach {
        case Valid(int) => println(s"Received num: $int")
        case Invalid(err) => println(s"Oh no, err: $err")
      }
  }

  Await.result(res, Duration.Inf)
}

Will yield:

Oh no, err: TaggedException$3: 42
Received num: 42
like image 34
Yuval Itzchakov Avatar answered Dec 08 '25 10:12

Yuval Itzchakov



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!