Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I use `firstValueFrom` with `WebSocketSubject` without closing the underlying web socket?

Tags:

rxjs

websocket

I am using a WebSocketSubject, and I often want to block execution until a given event arrives, which is why I use firstValueFrom, like in the following code:

let websocket = new WebSocketSubject<any>(url);
let firstMessage = await firstValueFrom(websocket.pipe(filter(m => true));

I only have one issue, which is that firstValueFrom calls websocket.unsubscribe() when it resolves the promise, but on a WebSocketSubject that has the effect of closing the underlying Web Socket, which I want to keep open!

Currently, I have thought of a few possible ways out:

  • Writing an equivalent of firstValueFrom that does not unsubscribe.
    Counter argument: I would prefer not reimplementing a function that is nearly perfect, except for one small issue;
  • Using another Subject that will subscribe to WebSocketSubject, and I will use firstValueFrom on that subject.
    Counter argument: In terms of usage, I see potential confusion to have two Subject objects, and having to know which one to use (E.g. Use websocket.next for sending messages upstream, and only use websocketProxy for receiving messages, and never get confused between the two!);
  • Using multiplex to create temporary Observable objects that will then be closed by firstValueFrom without issue.
    Counter argument: As I am not actually multiplexing in this case, I would rather not use that method, whose signature and usage seems overkill for my use case.

In short, I suspect that I am missing something basic (e.g. an appropriate OperatorFunction) that would allow me to make it so that the unsubscribe call made by firstValueFrom does not result in the underlying web socket being closed.

like image 377
Jean Hominal Avatar asked Jan 22 '26 07:01

Jean Hominal


1 Answers

Essentially, you want to always have a subscription so the socket connection stays open. I don't think firstValueFrom is the proper tool for the job. I think its simpler to just create an explicit subscription.

If the intent is to keep it open for the lifetime of the app, just subscribe at app launch.

Since you want to filter out the first several emissions until some condition is met, you can use skipWhile:

const websocket = new WebSocketSubject<any>(url);
const messages = websocket.pipe(skipWhile(m => m !== 'my special event'));

websocket.subscribe(); // keep socket open


// listen
messages.subscribe(m => console.log('message received:', m);

// send
websocket.next('hello server');

It may be worth creating a light wrapper class around the rxjs websocket that handles keeping the connection open and filtering out the first few events:

class MyWebsocket {
  private websocket = new WebSocketSubject<any>(this.url);
  public messages = websocket.pipe(skipWhile(m => m !== 'my special event'));

  constructor(private url) {
    this.websocket.subscribe(); // keep socket open
  }
 
  public sendMessage(message: any) {
    this.websocket.sendMessage(message);
  }
}
const websocket = new MyWebsocket(url);

// listen
websocket.messages.subscribe(m => console.log('message received:', m);

// send
websocket.sendMessage('hello server');
like image 65
BizzyBob Avatar answered Jan 27 '26 01:01

BizzyBob



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!