Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I conditionally buffer a Grouped Observable/Flux based on Emitted Events?

I am attempting to write a Reactive Stream based on the following information:

We have a stream of Entity Events where each Event contains the ID of its Entity and a Type of either INTENT or COMMIT. It is assumed that a COMMIT with a given ID will always be preceded by one-or-more INTENTs with the same ID. When an INTENT is received, it should be grouped by its ID and a "buffer" for that group should be opened. The buffer should be "closed" when a COMMIT for the same group is received or a configured timeout has lapsed. The resulting buffers should be emitted.

Note that it is possible to receive multiple INTENTs before receiving a closing COMMIT. (Edit:) The bufferDuration should guarantee that any "opened" buffer is emitted after bufferDuration time has lapsed since the INTENT that opened the buffer was received, with or without a COMMIT.

My latest attempt at this is the following:

public EntityEventBufferFactory {
    private final Duration bufferDuration;

    public EntityEventBufferFactory(Duration bufferDuration) {
        this.bufferDuration = bufferDuration;
    }

    public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
        return eventFlux.groupBy(EntityEvent::getId)
            .map(groupedFlux -> createGroupBuffer(groupedFlux))
            .flatMap(Function.identity());
    }

    protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
        return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
    }

    protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
        return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
    }

    protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.INTENT;
    }

    protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.COMMIT;
    }
}

And here is the test I am attempting to get passing:

@Test
public void entityEventsCanBeBuffered() throws Exception {
    FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();

    Duration bufferDuration = Duration.ofMillis(250);

    Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
    bufferFactory.setBufferDuration(bufferDuration);

    List<List<EntityEvent>> buffers = new ArrayList<>();
    bufferFlux.subscribe(buffers::add);

    EntityEvent intent = new EntityEvent();
    intent.setId("SOME_ID");
    intent.setEventType(EventType.INTENT);

    EntityEvent commit = new EntityEvent();
    commit.setId(intent.getId());
    commit.setEventType(EventType.COMMIT);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    Thread.sleep(500);

    assertEquals(2, buffers.size());
    assertFalse(buffers.get(0).isEmpty());
    assertFalse(buffers.get(1).isEmpty());
}

With this test, I get two emitted buffers, but they are both empty. You'll note that after digging around, I had to add .publish() at certain points to not get an Exception from Reactor saying This processor allows only a single Subscriber. The answer to this question, RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!", is what led me to that approach.

I'm currently using Reactor, but I think this translates 1-to-1 with RxJava using Observable and methods of the same names.

Any thoughts?

like image 285
Mr. E. Gas Avatar asked Jan 30 '26 14:01

Mr. E. Gas


1 Answers

I think that is the definitive use case of Rx groupBy. From the documentation:

Groups the items emitted by a Publisher according to a specified criterion, and emits these grouped items as GroupedFlowables. The emitted GroupedPublisher allows only a single Subscriber during its lifetime and if this Subscriber cancels before the source terminates, the next emission by the source having the same key will trigger a new GroupedPublisher emission.

In your case, this criterion is the ID, and on each GroupedPublisher emitted you takeUntil the type is COMMIT:

source
.groupBy(EntityEvent::getId)
.flatMap(group -> 
    group
    .takeUntil(Flowable.timer(10,TimeUnit.SECONDS))
    .takeUntil(this::shouldCloseBufferOnEvent)
    .toList())

Edit: added time condition.

like image 55
Tassos Bassoukos Avatar answered Feb 02 '26 04:02

Tassos Bassoukos