I'm new to RxJS and was wondering if anyone could help me.
I want to create a synchronous stream of responses (preferably with the corresponding requests) from a stream of requests(payload data).
I basically want the requests to be sent one by one, each waiting for the response from the last one.
I tried this, but it sends everything at once ( jsbin ):
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
  sendRequest,
  (val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);
function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('result for '+val);},1000);
  });
};The following works, to an extent, but does not use stream for the request data ( jsbin ).
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
  var sendNext = function(){
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response=>{
      observer.onNext({val, response});
      sendNext();
    });
  };
  sendNext();
});
responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);
function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
  });
};Thank you!
EDIT:
Just to clarify, this is what I wanted to achieve:
"Send A, when you receive response for A, send B, when you receive response for B, send C, etc..."
Using concatMap and defer, as suggested by user3743222, seems to do it ( jsbin ):
responseStream = requestStream.concatMap(
  (val)=>{
    return Rx.Observable.defer(()=>{
      return sendRequest(val);
    });
  },
  (val, response)=>{ return {val, response}; }
);
Try replacing flatMap with concatMap in your first code sample and let me know if the resulting behaviour corresponds to what you are looking for.
responseStream = requestStream.concatMap(//I replaced `flatMap`
  sendRequest,
  (val, response)=>{ return {val, response}; }
);
Basically concatMap has a similar signature than flatMap, the difference in behaviour being that it will wait for the current observable being flattened to complete before proceeding with the next one. So here:
requestStream value will be pushed to the concatMap operator.concatMap operator will generate a sendRequest observable, and whatever values out of that observable (seems to be a tuple (val, response)) will be passed through the selector function and the object result of that will be passed downstreamsendRequest completes, another requestStream value is processed.Alternatively, maybe you want to use defer to defer the execution of the sendRequest.
responseStream = requestStream.concatMap(//I replaced `flatMap`
  function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
  (val, response)=>{ return {val, response}; }
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With