Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Restart Reactive Messaging, e.g. after reconfigure

How can I restart or stop/resume the reactive messaging, e.g. after changing the interval time? This example is from the Quarkus guide: https://quarkus.io/guides/kafka-streams

@Outgoing("temperature-values")                             
public Flowable<KafkaRecord<Integer, String>> generate() {

    return Flowable.interval(500, TimeUnit.MILLISECONDS)    
            .onBackpressureDrop()
            .map(tick -> {
                WeatherStation station = stations.get(random.nextInt(stations.size()));
                double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
            });
}
like image 482
Kevin Avatar asked Jan 21 '26 09:01

Kevin


1 Answers

You can try to replace Flowable with Subject, as an option, and use Flowable to feed values into Subject itself. Then, when you want to replace whatever you need, you'll drop current Flowable and create new, that will feed Subject

class YourClass {

    private Subject<KafkaRecord<Integer, String>> temperatureSubject = BehaviorSubject.create();
    private Disposable currentSubscription;

    public void setFlowable() {
        if(currentSubscription != null && !currentSubscription.isDisposed()) {
            currentSubscription.dispose();
        }
        currentSubscription = Flowable.interval(5, TimeUnit.SECONDS)
                .map(it -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
                }).subscribe(it -> {
                    temperatureSubject.onNext(it);
                });
    }

    @Outgoing("temperature-values")
    public Flowable<KafkaRecord<Integer, String>> generate() {
        return temperatureSubject.toFlowable(BackpressureStrategy.LATEST);
    }
}
like image 185
Dmytro Chaban Avatar answered Jan 23 '26 21:01

Dmytro Chaban



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!