I am attempting to write a Reactive Stream based on the following information:
We have a stream of Entity Events where each Event contains the ID of its Entity and a Type of either INTENT or COMMIT. It is assumed that a COMMIT with a given ID will always be preceded by one-or-more INTENTs with the same ID. When an INTENT is received, it should be grouped by its ID and a "buffer" for that group should be opened. The buffer should be "closed" when a COMMIT for the same group is received or a configured timeout has lapsed. The resulting buffers should be emitted.
Note that it is possible to receive multiple INTENTs before receiving a closing COMMIT. (Edit:) The bufferDuration should guarantee that any "opened" buffer is emitted after bufferDuration time has lapsed since the INTENT that opened the buffer was received, with or without a COMMIT.
My latest attempt at this is the following:
public EntityEventBufferFactory {
private final Duration bufferDuration;
public EntityEventBufferFactory(Duration bufferDuration) {
this.bufferDuration = bufferDuration;
}
public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
return eventFlux.groupBy(EntityEvent::getId)
.map(groupedFlux -> createGroupBuffer(groupedFlux))
.flatMap(Function.identity());
}
protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
}
protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
}
protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.INTENT;
}
protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.COMMIT;
}
}
And here is the test I am attempting to get passing:
@Test
public void entityEventsCanBeBuffered() throws Exception {
FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();
Duration bufferDuration = Duration.ofMillis(250);
Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
bufferFactory.setBufferDuration(bufferDuration);
List<List<EntityEvent>> buffers = new ArrayList<>();
bufferFlux.subscribe(buffers::add);
EntityEvent intent = new EntityEvent();
intent.setId("SOME_ID");
intent.setEventType(EventType.INTENT);
EntityEvent commit = new EntityEvent();
commit.setId(intent.getId());
commit.setEventType(EventType.COMMIT);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
Thread.sleep(500);
assertEquals(2, buffers.size());
assertFalse(buffers.get(0).isEmpty());
assertFalse(buffers.get(1).isEmpty());
}
With this test, I get two emitted buffers, but they are both empty. You'll note that after digging around, I had to add .publish() at certain points to not get an Exception from Reactor saying This processor allows only a single Subscriber. The answer to this question, RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!", is what led me to that approach.
I'm currently using Reactor, but I think this translates 1-to-1 with RxJava using Observable and methods of the same names.
Any thoughts?
I think that is the definitive use case of Rx groupBy. From the documentation:
Groups the items emitted by a Publisher according to a specified criterion, and emits these grouped items as GroupedFlowables. The emitted GroupedPublisher allows only a single Subscriber during its lifetime and if this Subscriber cancels before the source terminates, the next emission by the source having the same key will trigger a new GroupedPublisher emission.
In your case, this criterion is the ID, and on each GroupedPublisher emitted you takeUntil the type is COMMIT:
source
.groupBy(EntityEvent::getId)
.flatMap(group ->
group
.takeUntil(Flowable.timer(10,TimeUnit.SECONDS))
.takeUntil(this::shouldCloseBufferOnEvent)
.toList())
Edit: added time condition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With