I found various solutions for RxJS WebSocketSubject automatic reconnection, but nothing worked for me.
In my RxJS 6.5 / Angular Code there is a connectToWebsocket() function which is executed once on startup, containing this:
const openObserver: NextObserver<Event> = {
next: () => console.debug('[Websocket] Connected')
};
const closeObserver: NextObserver<CloseEvent> = {
next: () => console.debug('[Websocket] Disconnected')
};
// Connect to WebSocket Server
this.websocketConnection= new WebSocketSubject({
url: "ws://localhost:4000",
deserializer: e => e.data,
openObserver,
closeObserver
});
After creating the Subject, I subscribe to it:
this.websocketConnection.subscribe((msg) => {
this.data = msg;
},
(err) => {
console.log("Error on on connection", err);
},
() => {
console.log("Connection closed");
});
So now, if initially or while connected, the connection is closed by any circumstances. How can I reconnect to the websocket server? Do I need to write an own reconnection strategy or is there one already in RxJS?
Thanks in advance!
Here is working Angular Service for web sockets support. Features:
import { Injectable } from "@angular/core";
import { Subject, timer } from "rxjs";
import { webSocket, WebSocketSubject } from "rxjs/webSocket";
import { tap, retryWhen, delayWhen } from "rxjs/operators";
import { ApiConfigService } from "../../api-config.service";
import { AuthStorageService } from "../../auth/auth-storage.service";
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
private RECONNECT_INTERVAL: number = 5000;
private socketsBaseUrl: string;
private socketClientId: string
private socket$: WebSocketSubject<any>;
private messages$: Subject<MessageEvent>;
constructor(apiConfigService: ApiConfigService, private authStorageService: AuthStorageService) {
this.socketsBaseUrl = apiConfigService.socketsBaseUrl;
}
public connect(jobId: string, cfg: { reconnect: boolean } = { reconnect: false }): Subject<MessageEvent> {
if (!this.socket$ || this.socket$.closed) {
this.socket$ = this.createNewWebSocket(jobId);
this.messages$ = <Subject<MessageEvent>>this.socket$.pipe(
tap({
error: error => console.log(error),
}),
retryWhen(errors =>
errors.pipe(
tap(val => console.log('[WebSocket] trying to reconnect', val)),
delayWhen(_ => timer(this.RECONNECT_INTERVAL))
)
)
);
}
return this.messages$;
}
private createNewWebSocket(jobId: string): WebSocketSubject<any> {
const clientId: number = Date.now();
this.socketClientId = `${clientId}-job${jobId}`
const authHeader: string = encodeURIComponent(this.authStorageService.getAuthorizationHeaderValue());
const socketsUrl: string = `${this.socketsBaseUrl}/${this.socketClientId}?authorization=${authHeader}`;
return webSocket({
url: socketsUrl,
openObserver: {
next: () => {
console.log('[WebSocket] connection established');
}
},
closeObserver: {
next: () => {
console.log('[WebSocket] connection closed');
this.socket$ = undefined;
this.connect(jobId, { reconnect: true });
}
},
});
}
public close(): void {
if (this.socket$ !== undefined) {
this.socket$.complete();
this.socket$ = undefined;
console.log(`[WebSocket] connection closed`);
}
}
}
this.connect(jobId, { reconnect: true }); - reconnect: true is the option you are looking for
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With