I have code that does blocking operation in while loop (downloads some data from a server). Client does not know how many items are going to be returned in each step. Loop breaks when N items are downloaded.
val n = 10
val list = ArrayList<T>()
while (list.size < n) {
val lastItemId = list.last()?.id ?: 0
val items = downloadItems(lastItemId)
list.addAll(items)
}
downloadItems performs blocking HTTP call and returns list. Now let's assume downloadItems changes and new return type is Observable<Item>. How could I change the code to use RxJava without performing something like blockingGet?
You could use repeatUntil to achieve this:
var totalItems = 0
var id = 0
Observable.fromCallable {
downloadItems(id)
}
.flatMap {
list ->
totalItems += list.size
id = list.last()?.id ?: 0
Observable.just(list)
}
.repeatUntil({totalItems > n})
.subscribe({result -> System.out.println(result) })
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With