I have an observable which continuously emits items ,and I need to process each one (the process function takes some time). So, meantime while processing an Item, if another item emits with the same value,
I can ignore it, since the same is already in progress. But once the current item is processed (and called onNext). and later if the same request comes, I should allow it.
I used the distinctUntildChanged operator, but what I can see is , it will not allow if the current item is same as the last one, even if the last item completed processing and called onNext.
I have a sample to demonstrate the issue
I have a class
class User {
String id;
String name;
public User(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public boolean equals(Object obj) {
User obj1 = (User) obj;
return id.equals(obj1.id);
}
@Override
public String toString() {
return name;
}
}
And an observable (Subject)
Subject<User> mSubject = PublishSubject.create();
And my Observable chain is
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));
And I emit values like this
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
}
The result is
emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19
All the objects are same here (equals method checks id). As you can see it took user0 first time, and will take 5 seconds to process, during this time I can ignore incoming items,
but after that onNext: User 0 I should allow the same user request, but distinctUntilChanged does not allow since its hoding the last value eqal to the same user, How can I do this?
Hope my question is clear.
So you can achieve this with a Flowable and the right BackpressureStrategy. The problem is that you are not setting the buffer size when doing observeOn. You could try this (Kotlin though):
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { println("emitting $it") }
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.io(), false,1)
.subscribeOn(Schedulers.io())
.subscribe {
println("consuming $it")
Thread.sleep(500)
}
The output would look like this:
emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14
When you call observeOn(Scheduler) the default buffer size for the back-pressure should be 128, if I'm not mistaken.
You can try by changing the buffer size in the sample above to, say, 3. You would get:
emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With