I am writing a service where people can paste in urls from Spotify playlists and then export the playlist in a different service. For each track url that is pasted in a request needs to be made to the Spotify api.
This code:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.flatMap(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
This actually works fine except for when a large number of tracks are added. One of my sample playlists has over 500 tracks so immediately 500 calls are made and the browser needs to deal with them / return items from the cache so the browser is slow and locks up AND spotify returns loads of errors as I exceed the api call limit.
I want to be able to only have say 10 calls running at the same time. Merge with maxConcurrent set seems like the perfect solution as discussed on Stackoverflow.
This would look like this:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.map(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.merge(10)
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
But it just doesn't work. In the chrome network debugger you can see all the calls made at the same time and most queueing for ages until they fail.
Why isn't this working? How else can I get round this issue?
Here is the Github checkin with the project at this stage:
The problem with your code using merge
is that the spotifyService.lookupTrack
doesn't return an Observable
but a Promise
. Some of Observable
s functions like flatMap
will handle Promise
s as well, but the difference between an Observable
and a Promise
is that the Observable
is lazy, while the Promise
is not. You can make a lazy observable from a promise factory function using Observable.defer
, as user3743222 suggests. This little example is in JavaScript instead of TypeScript so it can be run here.
console.log = x => {var d = document,b=d.body,p=d.createElement('pre'); p.style.margin = "0"; p.appendChild(d.createTextNode(''+x)); b.appendChild(p); window.scrollTo(0, b.scrollHeight); };
function log_delay(timeout, value) {
return new Promise(resolve => {
console.log('Start: ' + value);
setTimeout(() => {
console.log('End: ' + value);
resolve(value);
}, timeout);
});
}
Rx.Observable.range(0, 6)
.map(x => Rx.Observable.defer(
() => log_delay(1000, x)
.catch(e => console.log('Inner catch'))
))
.merge(2)
.subscribe(
s => console.log('Result: ' + s),
s => console.log('Error: ' + s),
s => console.log('Complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With