I want to use Rx Buffer functionality:
var source = new Subject<Price>();
var buffer = source
.Buffer(TimeSpan.FromSeconds(30), 5)
.Where(p => p.Any());
that means emit (publishing to subscribers) happens when buffer reaches size of 5 or 30 seconds have gone since the last emit.
But I need to be able to emit on demand - for example when I receive high priority sequence item. Then I want to add it to observable (source.OnNext()) and somehow force it to emit (that means returning all elements in the buffer and clearing it).
I know that I can add following code:
var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);
and invoke flusher.OnNext(highPriorityItem) and I will have it emitted.
But in this case, I have two independent sequences with two different emits. I need one emit when buffer is full or specific item appears in sequence.
Force flush count-type Observable.Buffer c# and Force flush to Observable.Buffer c# don't seem to be suitable for me
I think decPL has the basic idea right here, but his solution isn't stable. Depending on the scheduler of the input observable you can get unpredictable results even if it's subscribed in the right order. That's because there are multiple independent subscriptions to input. You need to push this all through a .Publish(...) call to ensure only one subscription.
Also it need a way of cleaning up when the subscription is disposed. So it also needs to run through a .Create(...) call.
Here's how:
var input = new Subject<Price>();
IObservable<IList<Price>> query =
input
.Publish(i =>
Observable
.Create<IList<Price>>(o =>
{
var timeBuffer =
Observable
.Timer(TimeSpan.FromSeconds(10.0))
.Select(n => Unit.Default);
var flush =
i
.Where(p => p.IS_IMPORTANT)
.Select(n => Unit.Default);
var sizeBuffer =
i
.Buffer(5)
.Select(l => Unit.Default);
return
i
.Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush))
.SelectMany(w => w.ToList())
.Subscribe(o);
}));
query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With