Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to forward incoming data via REST to an SSE stream in Quarkus

In my setting I want to forward certain status changes via an SSE channel (Server sent events). The status changes are initiated by calling a REST endpoint. So, I need to forward the incoming status change to the SSE stream.

What is the best/simplest way to accomplish this in Quarkus.

One solution I can think of is to use an EventBus (https://quarkus.io/guides/reactive-messaging). The SSE endpoint would subscribe to the status changes and push it through the SSE channel. The status change endpoint publishes appropriate events.

Is this a viable solution? Are there other (simpler) solutions? Do I need to use the reactive stuff in any case to accomplish this?

Any help is very appreciated!

like image 599
robbit Avatar asked Oct 26 '25 11:10

robbit


2 Answers

Easiest way would be to use rxjava as a stream provider. Firstly you need to add rxjava dependency. It can go either from reactive dependencies in quarkus such as kafka, or by using it directly(if you don't need any streaming libraries):

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.19</version>
        </dependency>

Here's example on how to send random double value each second:

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Publisher<Double> stream() {
        return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
    }

We create new Flowable which will fire every second and on each tick we generate next random double. Investigate any other options on how you can create Flowable such as Flowable.fromFuture() to adapt it for your specific code logic.

P.S code above will generate new Flowable each time you query this endpoint, I made it to save up space, in your case I assume you'll have a single source of events that you can build once and use the same instance every time endpoint queried

like image 134
Dmytro Chaban Avatar answered Oct 28 '25 02:10

Dmytro Chaban


Dmytro, thanks for pointing me in the right direction. I have opted for Mutiny in connection with Kotlin. My code now looks like this:

data class DeviceStatus(var status: Status = Status.OFFLINE) {
    enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}

@ApplicationScoped
class DeviceStatusService {
    var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
    var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)

    fun pushDeviceStatus(deviceStatus: DeviceStatus) {
        deviceStatusProcessor.onNext(deviceStatus)
    }

    fun getStream(): Multi<DeviceStatus> {
        return Multi.createFrom().publisher(deviceStatusQueue)
    }
}

@Path("/deviceStatus")
class DeviceStatusResource {
    private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")

    @Inject
    @field: Default
    lateinit var deviceStatusService: DeviceStatusService

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    fun status(status: DeviceStatus): Response {
        LOGGER.info("POST /deviceStatus " + status.status);
        deviceStatusService.pushDeviceStatus(status)
        return Response.ok().build();
    }

    @GET
    @Path("/eventStream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    fun stream(): Multi<DeviceStatus>? {
        return deviceStatusService.getStream()
    }
}

As minimal setup the service could directly use the deviceStatusProcessor as publisher. However, the Flowable adds buffering. Comments on the implementation are welcome.

like image 23
robbit Avatar answered Oct 28 '25 02:10

robbit



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!