Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use/control RxJava Observable.cache

I am trying to use the RxJava caching mechanism ( RxJava2 ) but i can't seem to catch how it works or how can i control the cached contents since there is the cache operator.

I want to verify the cached data with some conditions before emitting the new data.

for example

someObservable.
repeat().
filter { it.age < maxAge }.
map(it.name).
cache() 

How can i check and filter the cache value and emit it if its succeeds and if not then i will request a new value.

since the value changes periodically i need to verify if the cache is still valid before i can request a new one.

There is also ObservableCache<T> class but i can't find any resources of using it.

Any help would be much appreciated. Thanks.

like image 252
AK_92 Avatar asked Oct 27 '25 10:10

AK_92


1 Answers

This is not how replay/ cache works. Please read the #replay/ #cache documentation first.

replay

This operator returns a ConnectableObservable, which has some methods (#refCount/ #connect/ #autoConnect) for connecting to the source.

When #replay is applied without an overload, the source subscription is multicasted and all emitted values sind connection will be replayed. The source subscription is lazy and can connect to the source via #refCount/ #connect/ #autoConnect.

Returns a ConnectableObservable that shares a single subscription to the underlying ObservableSource that will replay all of its items and notifications to any future Observer.

Applying #relay without any connect-method (#refCount/ #connect/ #autoConnect) will not emit any values on subscription

A Connectable ObservableSource resembles an ordinary ObservableSource, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.

replay(1)#autoConnect(-1) / #refCount(1) / #connect

Applying replay(1) will cache the last value and will emit the cached value on each subscription. The #autoConnect will connect open an connection immediately and stay open until a terminal event (onComplete, onError) happens. #refCount is smiular, but will disconnect from the source, when all subscriber disappear. The #connect opreator can be used, when you need to wait, when alle subscriptions have been done to the observable, in order not to miss values.

usage

#replay(1) -- most of the it should be used at the end of the observable.

sourcObs.
  .filter()
  .map()
  .replay(bufferSize)
  .refCount(connectWhenXSubsciberSubscribed) 

caution

applying #replay without a buffer-limit or expiration date will lead to memory-leaks, when you observale is infinite

cache / cacheWithInitialCapacity

Operators are similar to #replay with autoConnect(1). The operators will cache every value and replay on each subsciption.

The operator subscribes only when the first downstream subscriber subscribes and maintains a single subscription towards this ObservableSource. In contrast, the operator family of replay() that return a ConnectableObservable require an explicit call to ConnectableObservable.connect(). Note: You sacrifice the ability to dispose the origin when you use the cache Observer so be careful not to use this Observer on ObservableSources that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().

example

    @Test
    fun skfdsfkds() {
        val create = PublishSubject.create<Int>()

        val cacheWithInitialCapacity = create
            .cacheWithInitialCapacity(1)

        cacheWithInitialCapacity.subscribe()

        create.onNext(1)
        create.onNext(2)
        create.onNext(3)

        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
    }

usage

Use cache operator, when you can not control the connect phase

This is useful when you want an ObservableSource to cache responses and you can't control the subscribe/dispose behavior of all the Observers.

caution

As with replay() the cache is unbounded and could lead to memory-leaks.

Note: The capacity hint is not an upper bound on cache size. For that, consider replay(int) in combination with ConnectableObservable.autoConnect() or similar.

further reading

https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/

https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/

like image 82
Hans Wurst Avatar answered Oct 30 '25 13:10

Hans Wurst



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!