Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala Akka Stream: How to Pass Through a Seq

I'm trying to wrap some blocking calls in Future.The return type is Seq[User] where User is a case class. The following just wouldn't compile with complaints of various overloaded versions being present. Any suggestions? I tried almost all the variations is Source.apply without any luck.

// All I want is Seq[User] => Future[Seq[User]]

def findByFirstName(firstName: String) = {
  val users: Seq[User] = userRepository.findByFirstName(firstName)

  val sink = Sink.fold[User, User](null)((_, elem) => elem)

  val src = Source(users) // doesn't compile

  src.runWith(sink)
}
like image 587
Abhijit Sarkar Avatar asked Dec 19 '25 04:12

Abhijit Sarkar


2 Answers

First of all, I assume that you are using version 1.0 of akka-http-experimental since the API may changed from previous release.

The reason why your code does not compile is that the akka.stream.scaladsl.Source$.apply() requires scala.collection.immutable.Seq instead of scala.collection.mutable.Seq.

Therefore you have to convert from mutable sequence to immutable sequence using to[T] method.

Document: akka.stream.scaladsl.Source

Additionally, as you see the document, Source$.apply() accepts ()=>Iterator[T] so you can also pass ()=>users.iterator as argument.

Since Sink.fold(...) returns the last evaluated expression, you can give an empty Seq() as the first argument, iterate over the users with appending the element to the sequence, and finally get the result.

However, there might be a better solution that can create a Sink which puts each evaluated expression into Seq, but I could not find it.

The following code works.

import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global

case class User(name:String)

object Main extends App{
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    val users = Seq(User("alice"),User("bob"),User("charlie"))

    val sink = Sink.fold[Seq[User], User](Seq())(
      (seq, elem) => 
        {println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})

    val src = Source(users.to[scala.collection.immutable.Seq])
    // val src = Source(()=>users.iterator) // this also works

    val fut = src.runWith(sink) // Future[Seq[User]]
    fut.onSuccess({
      case x=>{
        println(s"result => ${x}")
      }
    })
}

The output of the code above is

elem => User(alice)     | seq => List()
elem => User(bob)       | seq => List(User(alice))
elem => User(charlie)   | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))
like image 103
ymonad Avatar answered Dec 20 '25 20:12

ymonad


If you need just Future[Seq[Users]] dont use akka streams but futures

import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
  session.getFriends()
}
like image 26
al32 Avatar answered Dec 20 '25 22:12

al32



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!