fix(go-live): unsubscribe from topics before closing if not done already (#16244)

This commit is contained in:
ltorje-8x8
2025-07-22 00:47:24 +03:00
committed by damencho
parent 6fab1a3346
commit 127cfeb28d
2 changed files with 75 additions and 8 deletions

View File

@@ -1,4 +1,4 @@
import { Client } from '@stomp/stompjs';
import { Client, StompSubscription } from '@stomp/stompjs';
import logger from './logger';
import { WebsocketClient } from './websocket-client';
@@ -10,6 +10,9 @@ import { WebsocketClient } from './websocket-client';
export class VisitorsListWebsocketClient extends WebsocketClient {
private static client: VisitorsListWebsocketClient;
private _topicSubscription: StompSubscription | undefined;
private _queueSubscription: StompSubscription | undefined;
/**
* Creates a new instance of the VisitorsListWebsocketClient.
*
@@ -87,7 +90,7 @@ export class VisitorsListWebsocketClient extends WebsocketClient {
const cachedDeltas: Array<{ n: string; r: string; s: string; }> = [];
// Subscribe first for deltas so we don't miss any while waiting for the initial list
this.stompClient.subscribe(topicEndpoint, deltaMessage => {
this._topicSubscription = this.stompClient.subscribe(topicEndpoint, deltaMessage => {
try {
const updates: Array<{ n: string; r: string; s: string; }> = JSON.parse(deltaMessage.body);
@@ -102,7 +105,7 @@ export class VisitorsListWebsocketClient extends WebsocketClient {
});
// Subscribe for the initial list after topic subscription is active
const queueSubscription = this.stompClient.subscribe(queueEndpoint, message => {
this._queueSubscription = this.stompClient.subscribe(queueEndpoint, message => {
try {
const visitors: Array<{ n: string; r: string; }> = JSON.parse(message.body);
@@ -110,7 +113,11 @@ export class VisitorsListWebsocketClient extends WebsocketClient {
initialReceived = true;
initialCallback(visitors);
queueSubscription.unsubscribe();
// Unsubscribe from queue after receiving initial list
if (this._queueSubscription) {
this._queueSubscription.unsubscribe();
this._queueSubscription = undefined;
}
if (cachedDeltas.length) {
deltaCallback(cachedDeltas);
@@ -124,4 +131,45 @@ export class VisitorsListWebsocketClient extends WebsocketClient {
this.stompClient.activate();
}
/**
* Unsubscribes from both topic and queue subscriptions.
*
* @returns {void}
*/
override unsubscribe(): void {
if (this._topicSubscription) {
this._topicSubscription.unsubscribe();
logger.debug('Unsubscribed from visitors list topic');
this._topicSubscription = undefined;
}
if (this._queueSubscription) {
this._queueSubscription.unsubscribe();
logger.debug('Unsubscribed from visitors list queue');
this._queueSubscription = undefined;
}
}
/**
* Disconnects the current stomp client instance and clears it.
* Unsubscribes from any active subscriptions first.
*
* @returns {Promise}
*/
override disconnect(): Promise<any> {
if (!this.stompClient) {
return Promise.resolve();
}
const url = this.stompClient.brokerURL;
// Unsubscribe first (synchronous), then disconnect
this.unsubscribe();
return this.stompClient.deactivate().then(() => {
logger.debug(`disconnected from: ${url}`);
this.stompClient = undefined;
});
}
}

View File

@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { Client } from '@stomp/stompjs';
import { Client, StompSubscription } from '@stomp/stompjs';
import logger from './logger';
@@ -28,6 +28,8 @@ export class WebsocketClient {
private _connectCount = 0;
private _subscription: StompSubscription | undefined;
/**
* WebsocketClient getInstance.
*
@@ -100,7 +102,7 @@ export class WebsocketClient {
this._connectCount++;
connectCallback?.();
this.stompClient.subscribe(endpoint, message => {
this._subscription = this.stompClient.subscribe(endpoint, message => {
try {
callback(JSON.parse(message.body));
} catch (e) {
@@ -113,7 +115,21 @@ export class WebsocketClient {
}
/**
* Disconnects the current stomp client instance and clears it.
* Unsubscribes from the current subscription.
*
* @returns {void}
*/
unsubscribe(): void {
if (this._subscription) {
this._subscription.unsubscribe();
logger.debug('Unsubscribed from WebSocket topic');
this._subscription = undefined;
}
}
/**
* Disconnects the current stomp client instance and clears it.
* Unsubscribes from any active subscriptions first if available.
*
* @returns {Promise}
*/
@@ -124,8 +140,11 @@ export class WebsocketClient {
const url = this.stompClient.brokerURL;
// Unsubscribe first (synchronous), then disconnect
this.unsubscribe();
return this.stompClient.deactivate().then(() => {
logger.info(`disconnected from: ${url}`);
logger.debug(`disconnected from: ${url}`);
this.stompClient = undefined;
});
}