Is it at all possible to get the last message on a Kafka Topic using Akka Streams Kafka? I'm creating a websocket which listens to a Kafka Topic, but currently it retrieves all prior unred messages when I connecting. This can add up to quite a lot of messages, so I'm only interrested in the last message + any future messages. (or only future messages)
The source:
def source(): Flow[Any, String, NotUsed] = {
val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}
Consumer settings:
@Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
val deserializer = new StringDeserializer()
val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
.getOrElse(Configuration.empty)
ConsumerSettings(config.underlying, deserializer, deserializer)
.withBootstrapServers(kafkaUrl)
.withGroupId(GroupId)
}
I've tried adding setting the ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
Which should "automatically reset the offset to the latest offset", but it does not seem to have any effect.
I was able to avoid getting any upstream data upon client connection using a method described very neatly by David van Geest here
It boils down to having a BroadcastHub on the Consumer:
val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()
And connecting a static consumer to eat all the upstream data
liveSource.to(Sink.ignore).run()
Onwards this lets me have a WebSocket client subscribe to all data recieved by the consumer as such:
def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}
Or filter based on KafkaTopic (or whatever else you want)
def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
x =>
(Json.parse(x) \ "topic").asOpt[String] match {
case Some(str) => str.equals(kafkaTopic)
case None => false
}
}))
}
This does not resolve the issue of giving x amount of data to the user when first connecting, but I foresee us adding a simple database query for any historic data, and let the WebSocket connection only focus on the livestreaming data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With