I am trying to make a reactive pipeline using Java and project-reactor where the use-case is that the application generates flow status(INIT, PROCESSING, SAVED, DONE) at different levels. The status must be emitted asynchronously to a flux which is needed to be handled independently and separately from the main flow. I came across this link:
Spring WebFlux (Flux): how to publish dynamically
My sample flow is something like this:
public class StatusEmitterImpl implements StatusEmitter {
    private final FluxProcessor<String, String> processor;
    private final FluxSink<String> sink;
    public StatusEmitterImpl() {
        this.processor = DirectProcessor.<String>create().serialize();
        this.sink = processor.sink();
    }
    @Override
    public Flux<String> publisher() {
        return this.processor.map(x -> x);
    }
    public void publishStatus(String status) {
        sink.next(status);
    }
}
public class Try {
    public static void main(String[] args) {
    StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
    Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x -> 
        statusEmitter.publishStatus(x));
    statusEmitter.publisher().subscribe(x -> System.out.println(x));
    }
}
The problem is that nothing is getting printed on the console. I cannot understand what I am missing.
DirectProcessor passes values to its registered Subscribers directly, without caching the signals. If there is no Subscriber, then the value is "forgotten". If a Subscriber comes in late, then it will only receive signals emitted after it subscribed.
That's what is happening here: because fromIterable works on an in-memory collection, it has time to push all values to the DirectProcessor, which by that time doesn't have a registered Subscriber yet.
If you invert the last two lines you should see something.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With