I want to read multiple big files using Akka Streams to process each line. Imagine that each key consists of an (identifier -> value). If a new identifier is found, I want to save it and its value in the database; otherwise, if the identifier has already been found while processing the stream of lines, I want to save only the value. For that, I think that I need some kind of recursive stateful flow in order to keep the identifiers that have already been found in a Map. I think I'd receive in this flow a pair of (newLine, contextWithIdentifiers).
I've just started to look into Akka Streams. I guess I can manage myself to do the stateless processing stuff but I have no clue about how to keep the contextWithIdentifiers. I'd appreciate any pointers to the right direction.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.
Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.
A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.
Maybe something like statefulMapConcat can help you:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))
val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
//state with already processed ids
var ids = Set.empty[Int]
identValue => if (ids.contains(identValue.id)) {
//save value to DB
println(identValue.value)
List(identValue)
} else {
//save both to database
println(identValue)
ids = ids + identValue.id
List(identValue)
}
}
Source(identValues)
.via(stateFlow)
.runWith(Sink.seq)
.onSuccess { case identValue => println(identValue) }
A few years later, here is an implementation I wrote if you only need a 1-to-1 mapping (not 1-to-N):
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
object StatefulMap {
def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
}
class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] {
val in = Inlet[T]("StatefulMap.in")
val out = Outlet[O]("StatefulMap.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val f = converter
setHandler(in, () => push(out, f(grab(in))))
setHandler(out, () => pull(in))
}
}
Test (and demo):
behavior of "StatefulMap"
class Counter extends (Any => Int) {
var count = 0
override def apply(x: Any): Int = {
count += 1
count
}
}
it should "not share state among substreams" in {
val result = await {
Source(0 until 10)
.groupBy(2, _ % 2)
.via(StatefulMap(new Counter()))
.fold(Seq.empty[Int])(_ :+ _)
.mergeSubstreams
.runWith(Sink.seq)
}
result.foreach(_ should be(1 to 5))
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With