Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to retry / reconnect to Websocket server with RxJS WebSocketSubject

I found various solutions for RxJS WebSocketSubject automatic reconnection, but nothing worked for me.

In my RxJS 6.5 / Angular Code there is a connectToWebsocket() function which is executed once on startup, containing this:

    const openObserver: NextObserver<Event> = {
        next: () => console.debug('[Websocket] Connected')
    };
    const closeObserver: NextObserver<CloseEvent> = {
        next: () => console.debug('[Websocket] Disconnected')
    };

    // Connect to WebSocket Server
    this.websocketConnection= new WebSocketSubject({
        url: "ws://localhost:4000",
        deserializer: e => e.data,
        openObserver,
        closeObserver
    });

After creating the Subject, I subscribe to it:

    this.websocketConnection.subscribe((msg) => {
        this.data = msg;
    },
    (err) => {
        console.log("Error on on connection", err);
    },
    () => {
        console.log("Connection closed");
    });

So now, if initially or while connected, the connection is closed by any circumstances. How can I reconnect to the websocket server? Do I need to write an own reconnection strategy or is there one already in RxJS?

Thanks in advance!

like image 447
Henri Avatar asked Dec 22 '25 17:12

Henri


1 Answers

Here is working Angular Service for web sockets support. Features:

  • Reconnect in case of error, after 5 sec
  • Authorization
    import { Injectable } from "@angular/core";
    import { Subject, timer } from "rxjs";
    import { webSocket, WebSocketSubject } from "rxjs/webSocket";
    import { tap, retryWhen, delayWhen } from "rxjs/operators";
    
    import { ApiConfigService } from "../../api-config.service";
    import { AuthStorageService } from "../../auth/auth-storage.service";
    
    @Injectable({
        providedIn: 'root'
    })
    export class WebsocketService {
      private RECONNECT_INTERVAL: number = 5000;
    
      private socketsBaseUrl: string;
      private socketClientId: string
      private socket$: WebSocketSubject<any>;
      private messages$: Subject<MessageEvent>;
    
      constructor(apiConfigService: ApiConfigService, private authStorageService: AuthStorageService) {
        this.socketsBaseUrl = apiConfigService.socketsBaseUrl;
      }
    
      public connect(jobId: string, cfg: { reconnect: boolean } = { reconnect: false }): Subject<MessageEvent> {
        if (!this.socket$ || this.socket$.closed) {
          this.socket$ = this.createNewWebSocket(jobId);
          this.messages$ = <Subject<MessageEvent>>this.socket$.pipe(
            tap({
                error: error => console.log(error),
            }),
            retryWhen(errors =>
                errors.pipe(
                    tap(val => console.log('[WebSocket] trying to reconnect', val)),
                    delayWhen(_ => timer(this.RECONNECT_INTERVAL))
                )
            )
          );
        }
        return this.messages$;
      }
    
      private createNewWebSocket(jobId: string): WebSocketSubject<any> {
        const clientId: number = Date.now();
        this.socketClientId = `${clientId}-job${jobId}`
        const authHeader: string = encodeURIComponent(this.authStorageService.getAuthorizationHeaderValue());
        const socketsUrl: string = `${this.socketsBaseUrl}/${this.socketClientId}?authorization=${authHeader}`;
        return webSocket({
            url: socketsUrl,
            openObserver: {
                next: () => {
                  console.log('[WebSocket] connection established');
                }
              },
            closeObserver: {
                next: () => {
                  console.log('[WebSocket] connection closed');
                  this.socket$ = undefined;
                  this.connect(jobId, { reconnect: true });
                }
            },
        });
      }
    
      public close(): void {
          if (this.socket$ !== undefined) {
            this.socket$.complete();
            this.socket$ = undefined;
            console.log(`[WebSocket] connection closed`);
          }
      }
    }

this.connect(jobId, { reconnect: true }); - reconnect: true is the option you are looking for

like image 110
Aleksei Semidotskii Avatar answered Dec 24 '25 09:12

Aleksei Semidotskii



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!