Play Framework's iteratee library defines a method Enumerator.fromCallback which allows elements to be generated based on the results of a Future:
http://www.playframework.com/documentation/2.2.x/Enumerators
def fromCallback[E](
  retriever: () => Future[Option[E]],
  onComplete: () => Unit = () => (),
  onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
): Enumerator[E]
You can see a nice example of it being used to deliver paginated results from a Web service here:
http://engineering.klout.com/2013/01/iteratees-in-big-data-at-klout/
def pagingEnumerator(url:String):Enumerator[JsValue]={
  var maybeNextUrl = Some(url) //Next url to fetch
  Enumerator.fromCallback[JsValue] ( retriever = {
    val maybeResponsePromise =
      maybeNextUrl map { nextUrl=>  
        WS.url(nextUrl).get.map { reponse =>
          val json = response.json
          maybeNextUrl = (json \ "next_url").asOpt[String]
          val code = response.status //Potential error handling here
          json
        }   
      }
    /* maybeResponsePromise will be an Option[Promise[JsValue]].
     * Need to 'flip' it, to make it a Promise[Option[JsValue]] to
     * conform to the fromCallback constraints */
    maybeResponsePromise match {
      case Some(responsePromise) => responsePromise map Some.apply
      case None => PlayPromise pure None
    }
  })
}
What is the equivalent scalaz-stream code for doing the same?  I'm pretty sure it can be done using Process.emit or Process.await or maybe Process.eval, but I'd love to see a worked-out example.  This might also require lifting the scala Future into a scalaz Task, for which there's an answer here:
Convert scala 2.10 future to scalaz.concurrent.Future // Task
If it makes things simpler, we can ignore the scala Future vs scalaz Task bit and assume we have a Task.
To get scalaz.concurrent.Task from scala.concurrent.Future you can use Task.async, when you've got task in your hand you can do it this way:
  import java.util.concurrent.atomic.AtomicInteger
  import scalaz.concurrent.Task
  import scalaz.stream.Process.End
  import scalaz.stream._
  val cnt = new AtomicInteger(0)
  val task: Task[String] = Task {
    if (cnt.incrementAndGet() <= 10) s"Task ${cnt.get}" else throw End
  }
  Process.repeatEval(task).runLog.run.foreach(println)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With