I'm using for to run 2 futures in parallel. I want to know which succeeeded and which failed in all cases (all should run til completion with either a result or a failure status). Currently I am only able to retrieve a combined success result
I took insperation from here but it doesn't suffice since I don't get the success statuses when a single fails nor the failures of both in cases of both failing failure in Scala future's for comprehension
case class TaggedException(context:String, val throwable: Throwable) extends Exception(throwable.getMessage)
val f1 = Future {...}.recoverWith {case e:Throwable => Future.Failed(new TaggedException("first one failed", e))}
val f2 = Future {...}.recoverWith {case e: Throwable => Future.Failed(new TaggedException("second one failed", e))}
val combinedResult = for {
r1 <- f1
r2 <- f2
} yield (r1,r2)
combinedResult.onFailure {
case e : TaggedException => ... // if both fail I only get the first line in the for
// in case where single fails I only know fail status without the success of the second
}
I'm trying to avoid this mess:
var countCompleted = 0 ... or some other atomic way to count
f1 onComplete {
case Success(value) => {
... countCompleted increment ...
// handle success
if both completed {
// handle returning a status
}
}
case Failure(error) => {
... countCompleted increment ...
// handle failure
if both completed {
// handle returning a status
}
}
}
f2 onComplete {
case Success(value) => {
... countCompleted increment ...
// handle success
if both completed {
// handle returning a status
}
}
case Failure(error) => {
... countCompleted increment ...
// handle failure
if both completed {
// handle returning a status
}
}
}
Edit : Another version - Is this a valid approach?
def toFutureTry[A](future: Future[A]):Future[Try[A]] = future.map(Success(_)).recover {case t: Throwable => Failure(t)}
val fa: Future[Try[Blah]] = toFutureTry(f1)
val fb: Future[Try[Foo]] = toFutureTry(f2)
val combinedRes = for {
ra <- fa
rb <- fb
} yield (ra,rb)
combinedRes.onComplete {
case Success(successRes: (Try[Blah], Try[Foo])) => // all of these cases are success or fails
case Failure(f: Throwable) => // i think it is unused right?
}
You can combine transform and zip like this:
val combinedResult: Future[(Try[T], Try[T])] =
f1.transform(Success(_)).zip(f2.transform(Success(_)))
Then you can do:
combinedResult map {
case (Success(v1), Success(v2)) =>
case (Success(v1), Failure(f2)) =>
case (Failure(f1), Success(v2)) =>
case (Failure(f1), Failure(f2)) =>
}
Using flatMap on Future[A] won't help, as it will always short circuit on the first failure produced by one of them, where you really want to accumulate errors.
A solution using Future.traverse which will work on arbitrary many Future[A] instances:
val f1 = Future.failed[Int](new Exception("42")).recoverWith {
case e: Throwable => Future.failed(TaggedException("first one failed", e))
}
val f2 = Future(42).recoverWith {
case e: Throwable =>
Future.failed(TaggedException("second one failed", e))
}
val res: Future[List[Either[Throwable, Int]]] =
Future
.traverse(List(f1, f2)) {
eventualInt => eventualInt
.map(i => Right(i))
.recover { case err => Left(err) }
}
res.onComplete {
case Failure(exception) =>
println(exception)
case Success(value) =>
value.foreach {
case Right(int) => println(s"Received num: $int")
case Left(err) => println(s"Oh no, err: $err")
}
}
Await.result(res, Duration.Inf)
We can also use a little help from cats with it's Validated type:
import cats.data.Validated.{Invalid, Valid}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import cats.implicits._
import scala.util.{Failure, Success}
def main(args: Array[String]): Unit = {
case class TaggedException(context: String, throwable: Throwable)
extends Exception(throwable.getMessage)
val f1 = Future.failed[Int](new Exception("42")).recoverWith {
case e: Throwable => Future.failed(TaggedException("first one failed", e))
}
val f2 = Future(42).recoverWith {
case e: Throwable => Future.failed(TaggedException("second one failed", e))
}
val res: Future[List[Validated[Throwable, Int]]] =
List(f1, f2)
.traverse(eventualInt => eventualInt
.map(i => Valid(i))
.recover { case err => Invalid(err) })
res.onComplete {
case Failure(exception) =>
println(exception)
case Success(value) =>
value.foreach {
case Valid(int) => println(s"Received num: $int")
case Invalid(err) => println(s"Oh no, err: $err")
}
}
Await.result(res, Duration.Inf)
}
Will yield:
Oh no, err: TaggedException$3: 42
Received num: 42
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With