I have the following code, which creates a custom Observable using the Observable.create(OnSubscribe) method:
public class Main {
public static void main(String[] args) {
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
new Scanner(System.in).nextLine();
System.err.println("finishing");
subscription.unsubscribe();
}
}
The Observable issues a string every second using a timer. When the user presses enter, the subscriptions is cancelled.
However, the timer is still executed. How can I cancel the timer? I guess there must be a hook somewhere, but I can't find it.
On .NET, the create method would return an IDisposable which I could be my implementation to stop the timer. I am not sure how to map it to RxJava, as its subscribe method is void.
A more declarative (and IMHO easier to read) solution would be to use the Observable.using method:
Observable<String> obs = Observable.using(
// resource factory:
() -> new Timer(),
// observable factory:
timer -> Observable.create(subscriber -> {
TimerTask task = new TimerTask() {
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}),
// dispose action:
timer -> timer.cancel()
);
You declare how the dependent resource (the Timer) is created, how it's used to create an Observable, and how it's disposed of, and RxJava will take care of creating the timer on subscription and disposing of it on unsubscription.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With