refactor: ♻️ 通知公告、字典重构

This commit is contained in:
ray
2024-10-03 17:42:51 +08:00
parent 476f257bc1
commit 3db4b2a6e5
25 changed files with 931 additions and 669 deletions

View File

@@ -1,79 +1,38 @@
import { Client } from "@stomp/stompjs";
import { getToken } from "@/utils/auth";
const MAX_RECONNECT_ATTEMPTS = 3; // 最大重连尝试次数
const RECONNECT_DELAY_MS = 5000; // 重连延迟时间(毫秒)
const HEARTBEAT_INTERVAL_MS = 30000; // 心跳间隔时间(毫秒)
const MAX_RECONNECT_ATTEMPTS = 3;
const RECONNECT_DELAY_MS = 5000;
const HEARTBEAT_INTERVAL_MS = 30000;
class WebSocketManager {
private clients: Map<string, Client> = new Map(); // 保存所有 WebSocket 客户端
private reconnectAttempts: Map<string, number> = new Map(); // 记录各地址的重连次数
private client: Client | null = null;
private reconnectAttempts: number = 0;
private messageHandlers: Map<string, ((message: string) => void)[]> =
new Map(); // 保存订阅的消息回调
new Map();
constructor() {}
/**
* 获取已有的 WebSocket 客户端
*
* @param endpoint WebSocket 连接地址
* @returns WebSocket Client 实例或 undefined
*/
public getClient(endpoint: string): Client | undefined {
return this.clients.get(endpoint);
}
private getOrCreateClient(onError?: (error: any) => void): Client {
const endpoint = import.meta.env.VITE_APP_WS_ENDPOINT;
/**
* 获取 WebSocket 客户端,如果已存在则返回已有客户端,否则创建新的客户端
*
* @param endpoint WebSocket 连接地址
* @param onMessage 收到消息时的回调
* @param onError 出现错误时的回调
* @returns WebSocket Client 实例
*/
public getOrCreateClient(
endpoint: string,
onMessage: (message: string) => void,
onError?: (error: any) => void
): Client {
let client = this.getClient(endpoint);
if (client) {
// 如果该地址已有连接,直接添加消息回调
this.messageHandlers.get(endpoint)?.push(onMessage);
} else {
// 否则创建新客户端
client = this.createClient(endpoint, onMessage, onError);
this.clients.set(endpoint, client);
this.messageHandlers.set(endpoint, [onMessage]);
if (this.client) {
return this.client;
}
return client;
}
/**
* 创建 WebSocket 客户端
*
* @param endpoint WebSocket 连接地址
* @param onMessage 收到消息时的回调
* @param onError 出现错误时的回调
* @returns WebSocket Client 实例
* @private
*/
private createClient(
endpoint: string,
onMessage: (message: string) => void,
onError?: (error: any) => void
): Client {
const client = new Client({
brokerURL: endpoint, // 使用传入的 endpoint 动态设置连接地址
this.client = new Client({
brokerURL: endpoint,
connectHeaders: {
Authorization: getToken(),
},
heartbeatIncoming: HEARTBEAT_INTERVAL_MS,
heartbeatOutgoing: HEARTBEAT_INTERVAL_MS,
onConnect: () => {
console.log(`已连接到 ${endpoint}`);
client.subscribe(endpoint, (message) => {
onMessage(message.body); // 收到消息时调用回调
console.log(`已连接到 WebSocket 服务器: ${endpoint}`);
this.messageHandlers.forEach((handlers, topic) => {
handlers.forEach((handler) => {
this.subscribeToTopic(topic, handler);
});
});
},
onStompError: (frame) => {
@@ -84,66 +43,66 @@ class WebSocketManager {
if (onError) {
onError(frame);
}
this.handleReconnect(endpoint); // 出现错误时处理重连
this.handleReconnect();
},
onDisconnect: () => {
console.log(`已断开连接: ${endpoint}`);
this.handleReconnect(endpoint); // 断开时处理重连
this.handleReconnect();
},
});
client.activate();
return client;
this.client.activate();
return this.client;
}
/**
* 处理 WebSocket 重连
*
* @param endpoint WebSocket 连接地址
* @private
*/
private handleReconnect(endpoint: string) {
const attemptCount = this.reconnectAttempts.get(endpoint) || 0;
if (this.clients.has(endpoint)) {
const client = this.clients.get(endpoint);
if (client && client.connected) {
client.deactivate(); // 主动断开已有连接
}
public subscribeToTopic(
topic: string,
onMessage: (message: string) => void,
onError?: (error: any) => void
) {
if (!this.client || !this.client.connected) {
console.log("WebSocket 尚未连接,正在连接...");
this.getOrCreateClient(onError);
}
// 重连次数未达到最大次数时继续重连
if (attemptCount < MAX_RECONNECT_ATTEMPTS) {
this.reconnectAttempts.set(endpoint, attemptCount + 1);
if (this.messageHandlers.has(topic)) {
this.messageHandlers.get(topic)?.push(onMessage);
} else {
this.messageHandlers.set(topic, [onMessage]);
}
if (this.client?.connected) {
console.log(`正在订阅主题: ${topic}`);
this.client.subscribe(topic, (message) => {
const handlers = this.messageHandlers.get(topic);
handlers?.forEach((handler) => handler(message.body));
});
}
}
private handleReconnect() {
if (this.reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
this.reconnectAttempts++;
console.log(
`尝试重连 (${attemptCount + 1}/${MAX_RECONNECT_ATTEMPTS}): ${endpoint}`
`重连尝试 (${this.reconnectAttempts}/${MAX_RECONNECT_ATTEMPTS})`
);
setTimeout(() => {
const originalOnMessage = this.messageHandlers.get(endpoint) || [];
this.getOrCreateClient(
endpoint,
(message) => originalOnMessage.forEach((handler) => handler(message)),
() => {}
);
this.client?.deactivate();
this.client = null;
this.getOrCreateClient();
}, RECONNECT_DELAY_MS);
} else {
console.error(`达到最大重连次数: ${endpoint}`);
this.reconnectAttempts.delete(endpoint); // 超过最大重连次数后清除重连记录
console.error("达到最大重连次数,停止重连");
}
}
/**
* 断开所有 WebSocket 连接
*
* @param delay 延迟断开时间(毫秒),默认为 0
*/
public disconnectAll(delay: number = 0) {
this.clients.forEach((client, endpoint) => {
console.log(`断开 WebSocket 连接: ${endpoint}`);
setTimeout(() => client.deactivate(), delay); // 延迟断开连接
});
this.clients.clear();
this.reconnectAttempts.clear();
public disconnect() {
if (this.client) {
console.log("断开 WebSocket 连接");
this.client.deactivate();
this.client = null;
}
}
}