Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs and websocket - Do I need a heartbeat?

I'm fairly new to ReactiveX and handling socket events. One of the things that I continuously stumble upon is, the necessity for a keepalive heartbeat functionality when dealing with websockets.

The reasons I picked up, while searching online, for having a keepalive may entail the following reasons:

  • We need to be able to detect if the connection to the server breaks down
  • We need to keep the connection open, which we can do with a heartbeat. (Isn't it always open until I manually close the connection?)

Currently, I'm in the process of implementing this with rxjs, and I am about to implement a heartbeat, but then I start to doubt if this actually is necessary?

When looking through the rxjs library, close events can be handled very easily with the observers that the rxjs socket subject provides, i.e.:

interface WebSocketSubjectConfig<T> {
    url: string
    protocol?: string | Array<string>
    resultSelector?: (e: MessageEvent) => T
    serializer?: (value: T) => WebSocketMessage
    deserializer?: (e: MessageEvent) => T
    openObserver?: NextObserver<Event>
    closeObserver?: NextObserver<CloseEvent>
    closingObserver?: NextObserver<void>
    WebSocketCtor?: {...}
    binaryType?: 'blob' | 'arraybuffer'
}

So in case I face a connection problem, I could just as easily create an Observable for the closeObserver which could listen for any close-event and subsequently trigger a reconnect loop.

I even tested this, with my local API, and I can see that the event is triggered every time I simulate an "error" in my API.

My question is therefore:

  • Am I missing something, does rxjs already handle this for me, and provides me with the closeObserver in return?
  • Why would I need to implement a ping-pong pattern if what I have already seems okay? (Are there any corners that I haven't looked into?)
like image 919
Jeppe Christensen Avatar asked Sep 17 '25 17:09

Jeppe Christensen


1 Answers

Isn't it always open until I manually close the connection?

Yes, that's right. Unless you either manually close it, or there is an error along the way, the connection will stay on. A heartbeat mechanism is not needed for that.

We need to be able to detect if the connection to the server breaks down

This is actually a good question, because it might not be immediately obvious. So let's look at the actual code of WebSocketSubject:

  /**
   * An Observer then watches when close events occur on the underlying webSocket
   */
  closeObserver?: NextObserver<CloseEvent>;

So this one is called always when a close events comes. And then their own example:

 * **closeObserver** allows us to set a custom error when an error raise up.
 * import { webSocket } from 'rxjs/webSocket';
 *
 * const wsSubject = webSocket({
 *     url: 'ws://localhost:8081',
 *     closeObserver: {
        next(closeEvent) {
            const customError = { code: 6666, reason: "Custom evil reason" }
            console.log(`code: ${customError.code}, reason: ${customError.reason}`);
        }
    }
 * });
 *
 * //output
 * // code: 6666, reason: Custom evil reason

Which shows how they use a closeObserver Obs for tunneling a customError. But so far the point is that by its own, closeObserver won't handle all closing scenarios/paths.

But if we look down in the source file, we'll see this:

    socket.onerror = (e: Event) => {
      this._resetState();
      observer.error(e);
    };

    socket.onclose = (e: CloseEvent) => {
      this._resetState();
      const { closeObserver } = this._config;
      if (closeObserver) {
        closeObserver.next(e);
      }
      if (e.wasClean) {
        observer.complete();
      } else {
        observer.error(e);
      }
    };

Where const observer = this._output; and this._output = new Subject<T>();. So as we can see, the actual 'core' Subject is being signaled with an error, if the close event wasn't a clean one.

Which means that, internally, RxJS WS does and exposes everything needed for catching any type of server disconnection, so that a custom heartbeat mechanism would be only redundant.

Btw, from my personal experience, most of the WS clients behave similarly, and only the Server implementation might actually benefit from a custom heartbeat system (nowadays maybe even there everything is handled by the libs/framework, I don't know).


Bonus

A possible implementation for a correct (and bit more complex) connect/reconnect flow using RxJS WS:

this.wsSubject = webSocket(wsUrl);
this.wsSubscription = this.wsSubject.pipe(
  retryWhen(errors =>
    errors.pipe(
      concatMap((error, i) =>
        iif(
          () => this.environment.webSockets.maxReconnectAttempts !== -1 &&
            i >= this.environment.webSockets.maxReconnectAttempts,
          throwError('WebSocket reconnecting retry limit exceeded!'),
          of(error).pipe(
            tap(() => {
              this.disconnected = true;
              this.log.warn('Trying to reconnect to WebSocket server...');
            }),
            delay(this.environment.webSockets.reconnectAttemptDelay)
          )
        )
      )
    )
  ),
  tap(() => {
    if (this.disconnected) {
      this.disconnected = false;
      this.log.info('Successfully re-connected to the WebSocket server.');
    }
  })
).subscribe(
  (data) => this.handleNotification(data),
  (err) => this.log.error(err),
  () => this.log.warn('Connection to the WebSocket server was closed!')
);
}
like image 152
Mihai Paraschivescu Avatar answered Sep 20 '25 07:09

Mihai Paraschivescu