Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I avoid passing the scheduler through my business logic, when writing tests for RXJS observables?

I am finding that the only way to make certain tests pass is to explicitly pass a scheduler to functions. For illustration, consider this function:

function doStuff( stream ){
    return stream.delay(100)
        .filter( x => x % 2 === 0 )
        .map( x => x * 2 )
        .flatMapLatest( x=> Rx.Observable.range( x, x+100) )

And a test:

it('example test', () => {
    let scheduler = new Rx.TestScheduler()
    let xs = scheduler.createHotObservable(
        onNext(210, 1),
        onNext(220, 2),
        onNext(230, 3)
    )

    let res = scheduler.startScheduler(
        () => doStuff( xs, scheduler ),
        {created:0, subscribed:200, disposed:1000})

    expect( res.messages ).toEqual( [
        onNext(321, 4),
        onNext(322, 5),
        onNext(323, 6)
    ] )
})

Which gives an error:

    Expected [  ] to equal [ ({ time: 321, value: OnNextNotification({ value: 4, kind: 'N' }), comparer: Function }), ({ time: 322, value: OnNextNotification({ value: 5, kind: 'N' }), comparer: Function }), ({ time: 323, value: OnNextNotification({ value: 6, kind: 'N' }), comparer: Function }) ].

This suggests that the delay is happening in real time instead of the simulated time of the TestScheduler.

If I pass the scheduler to every operator, then I can make it work:

function doStuff( stream, scheduler ){
   return stream.delay( 100, scheduler )
      .filter( x => x % 2 === 0 )
      .map( x => x * 2 )
      .flatMapLatest( x => Rx.Observable.range(x, 3, scheduler ) )
}

but it feels to me like I should be able to set the Scheduler once and not have to have my real production code have to thread it through. I was really expecting that, given the original stream is created from the TestScheduler and then run via the same scheduler, that this would all be automatically wired up.

like image 481
Peter Hall Avatar asked Nov 25 '25 02:11

Peter Hall


1 Answers

The RX guidelines suggest to consider passing a specific scheduler to concurrency introducing operators. For single-threaded Javascript, there is no concurrency, but time-based operators like delay() have a similar concern.

This isn't as bad as I first thought, as the majority of operators do not have a scheduler argument, and only a subset of those are time-based. And this highlights why you would explicitly pass a scheduler. In my example above, I passed through the scheduler to every operator that supported it, but the results weren't exactly as I expected - I even tweaked my "expected" result to make it work:

expect( res.messages ).toEqual( [
    onNext(321, 4),
    onNext(322, 5),
    onNext(323, 6)
] )

But really, I was expecting all of those timings to be 320.

For delay(), I need to inherit the scheduler from the test, but for range() I want the default scheduler instead, because it will schedule the events immediately.

My final example code snippet would then look like this:

function doStuff( stream, scheduler ){
    return stream.delay( 100, scheduler )
        .filter( x => x % 2 === 0 )
        .map( x => x * 2 )
        .flatMapLatest( x => Rx.Observable.range(x, 3))
}

When creating tests, you would always want to be using the TestScheduler for time-based operators. Other operators are likely to want to use the DefaultScheduler regardless of if a test is running.

like image 117
Peter Hall Avatar answered Nov 26 '25 16:11

Peter Hall



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!