I'm fairly new to ReactiveX and handling socket events. One of the things that I continuously stumble upon is, the necessity for a keepalive heartbeat functionality when dealing with websockets.
The reasons I picked up, while searching online, for having a keepalive may entail the following reasons:
Currently, I'm in the process of implementing this with rxjs, and I am about to implement a heartbeat, but then I start to doubt if this actually is necessary?
When looking through the rxjs library, close events can be handled very easily with the observers that the rxjs socket subject provides, i.e.:
interface WebSocketSubjectConfig<T> {
url: string
protocol?: string | Array<string>
resultSelector?: (e: MessageEvent) => T
serializer?: (value: T) => WebSocketMessage
deserializer?: (e: MessageEvent) => T
openObserver?: NextObserver<Event>
closeObserver?: NextObserver<CloseEvent>
closingObserver?: NextObserver<void>
WebSocketCtor?: {...}
binaryType?: 'blob' | 'arraybuffer'
}
So in case I face a connection problem, I could just as easily create an Observable for the closeObserver
which could listen for any close-event and subsequently trigger a reconnect loop.
I even tested this, with my local API, and I can see that the event is triggered every time I simulate an "error" in my API.
My question is therefore:
closeObserver
in return?Isn't it always open until I manually close the connection?
Yes, that's right. Unless you either manually close it, or there is an error along the way, the connection will stay on. A heartbeat mechanism is not needed for that.
We need to be able to detect if the connection to the server breaks down
This is actually a good question, because it might not be immediately obvious. So let's look at the actual code of WebSocketSubject
:
/**
* An Observer then watches when close events occur on the underlying webSocket
*/
closeObserver?: NextObserver<CloseEvent>;
So this one is called always when a close events comes. And then their own example:
* **closeObserver** allows us to set a custom error when an error raise up.
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* closeObserver: {
next(closeEvent) {
const customError = { code: 6666, reason: "Custom evil reason" }
console.log(`code: ${customError.code}, reason: ${customError.reason}`);
}
}
* });
*
* //output
* // code: 6666, reason: Custom evil reason
Which shows how they use a closeObserver
Obs for tunneling a customError. But so far the point is that by its own, closeObserver
won't handle all closing scenarios/paths.
But if we look down in the source file, we'll see this:
socket.onerror = (e: Event) => {
this._resetState();
observer.error(e);
};
socket.onclose = (e: CloseEvent) => {
this._resetState();
const { closeObserver } = this._config;
if (closeObserver) {
closeObserver.next(e);
}
if (e.wasClean) {
observer.complete();
} else {
observer.error(e);
}
};
Where const observer = this._output;
and this._output = new Subject<T>();
. So as we can see, the actual 'core' Subject is being signaled with an error, if the close event wasn't a clean one.
Which means that, internally, RxJS WS does and exposes everything needed for catching any type of server disconnection, so that a custom heartbeat mechanism would be only redundant.
Btw, from my personal experience, most of the WS clients behave similarly, and only the Server implementation might actually benefit from a custom heartbeat system (nowadays maybe even there everything is handled by the libs/framework, I don't know).
Bonus
A possible implementation for a correct (and bit more complex) connect/reconnect flow using RxJS WS:
this.wsSubject = webSocket(wsUrl);
this.wsSubscription = this.wsSubject.pipe(
retryWhen(errors =>
errors.pipe(
concatMap((error, i) =>
iif(
() => this.environment.webSockets.maxReconnectAttempts !== -1 &&
i >= this.environment.webSockets.maxReconnectAttempts,
throwError('WebSocket reconnecting retry limit exceeded!'),
of(error).pipe(
tap(() => {
this.disconnected = true;
this.log.warn('Trying to reconnect to WebSocket server...');
}),
delay(this.environment.webSockets.reconnectAttemptDelay)
)
)
)
)
),
tap(() => {
if (this.disconnected) {
this.disconnected = false;
this.log.info('Successfully re-connected to the WebSocket server.');
}
})
).subscribe(
(data) => this.handleNotification(data),
(err) => this.log.error(err),
() => this.log.warn('Connection to the WebSocket server was closed!')
);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With