refactor(websocket): optimize STOMP connection and subscription management

This commit is contained in:
Ray.Hao
2025-11-08 00:03:01 +08:00
parent 3ae40e8447
commit 8105432713
3 changed files with 633 additions and 496 deletions

View File

@@ -6,363 +6,525 @@ export interface UseStompOptions {
brokerURL?: string;
/** 用于鉴权的 token不传时使用 getAccessToken() 的返回值 */
token?: string;
/** 重连延迟,单位毫秒,默认为 8000 */
/** 重连延迟,单位毫秒,默认为 15000 */
reconnectDelay?: number;
/** 连接超时时间,单位毫秒,默认为 10000 */
connectionTimeout?: number;
/** 是否开启指数退避重连策略 */
useExponentialBackoff?: boolean;
/** 最大重连次数,默认为 5 */
/** 最大重连次数,默认为 3 */
maxReconnectAttempts?: number;
/** 最大重连延迟,单位毫秒,默认为 60000 */
maxReconnectDelay?: number;
/** 是否开启调试日志 */
debug?: boolean;
/** 是否在重连时自动恢复订阅,默认为 true */
autoRestoreSubscriptions?: boolean;
}
/**
* STOMP WebSocket连接组合式函数
* 用于管理WebSocket连接的建立、断开、重连和消息订阅
* 订阅配置信息
*/
interface SubscriptionConfig {
destination: string;
callback: (message: IMessage) => void;
}
/**
* 连接状态枚举
*/
enum ConnectionState {
DISCONNECTED = "DISCONNECTED",
CONNECTING = "CONNECTING",
CONNECTED = "CONNECTED",
RECONNECTING = "RECONNECTING",
}
/**
* STOMP WebSocket 连接管理组合式函数
*
* 核心功能:
* - 自动连接管理(连接、断开、重连)
* - 订阅管理(订阅、取消订阅、自动恢复)
* - 心跳检测
* - Token 自动刷新
*
* @param options 配置选项
* @returns STOMP 客户端操作接口
*/
export function useStomp(options: UseStompOptions = {}) {
// 默认值brokerURL 从环境变量中获取token 从 getAccessToken() 获取
// ==================== 配置初始化 ====================
const defaultBrokerURL = import.meta.env.VITE_APP_WS_ENDPOINT || "";
const brokerURL = ref(options.brokerURL ?? defaultBrokerURL);
// 默认配置参数
const reconnectDelay = options.reconnectDelay ?? 15000; // 默认15秒重连间隔
const connectionTimeout = options.connectionTimeout ?? 10000;
const useExponentialBackoff = options.useExponentialBackoff ?? false;
const maxReconnectAttempts = options.maxReconnectAttempts ?? 3; // 最多重连3次
const maxReconnectDelay = options.maxReconnectDelay ?? 60000;
const config = {
brokerURL: ref(options.brokerURL ?? defaultBrokerURL),
reconnectDelay: options.reconnectDelay ?? 15000,
connectionTimeout: options.connectionTimeout ?? 10000,
useExponentialBackoff: options.useExponentialBackoff ?? false,
maxReconnectAttempts: options.maxReconnectAttempts ?? 3,
maxReconnectDelay: options.maxReconnectDelay ?? 60000,
autoRestoreSubscriptions: options.autoRestoreSubscriptions ?? true,
debug: options.debug ?? false,
};
// 连接状态标记
const isConnected = ref(false);
// 重连尝试次数
const reconnectCount = ref(0);
// 重连计时器
// ==================== 状态管理 ====================
const connectionState = ref<ConnectionState>(ConnectionState.DISCONNECTED);
const isConnected = computed(() => connectionState.value === ConnectionState.CONNECTED);
const reconnectAttempts = ref(0);
// ==================== 定时器管理 ====================
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
// 连接超时计时器
let connectionTimeoutTimer: ReturnType<typeof setTimeout> | null = null;
// 存储所有订阅
const subscriptions = new Map<string, StompSubscription>();
// 用于保存 STOMP 客户端的实例
const client = ref<Client | null>(null);
// 防止重复连接的标志
let isConnecting = false;
// ==================== 订阅管理 ====================
// 活动订阅:存储当前 STOMP 订阅对象
const activeSubscriptions = new Map<string, StompSubscription>();
// 订阅配置注册表:用于自动恢复订阅
const subscriptionRegistry = new Map<string, SubscriptionConfig>();
// ==================== 客户端实例 ====================
const stompClient = ref<Client | null>(null);
let isManualDisconnect = false;
// ==================== 工具函数 ====================
/**
* 清理所有定时器
*/
const clearAllTimers = () => {
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (connectionTimeoutTimer) {
clearTimeout(connectionTimeoutTimer);
connectionTimeoutTimer = null;
}
};
/**
* 日志输出(支持调试模式控制)
*/
const log = (...args: any[]) => {
if (config.debug) {
console.log("[useStomp]", ...args);
}
};
const logWarn = (...args: any[]) => {
console.warn("[useStomp]", ...args);
};
const logError = (...args: any[]) => {
console.error("[useStomp]", ...args);
};
/**
* 恢复所有订阅
*/
const restoreSubscriptions = () => {
if (!config.autoRestoreSubscriptions || subscriptionRegistry.size === 0) {
return;
}
log(`开始恢复 ${subscriptionRegistry.size} 个订阅...`);
for (const [destination, subscriptionConfig] of subscriptionRegistry.entries()) {
try {
performSubscribe(destination, subscriptionConfig.callback);
} catch (error) {
logError(`恢复订阅 ${destination} 失败:`, error);
}
}
};
/**
* 初始化 STOMP 客户端
*/
const initializeClient = () => {
// 如果客户端已存在且正在连接或已连接,直接返回
if (client.value && (client.value.active || client.value.connected)) {
console.log("STOMP客户端已存在且处于活动状态跳过初始化");
// 如果客户端已存在且处于活动状态,直接返回
if (stompClient.value && (stompClient.value.active || stompClient.value.connected)) {
log("STOMP 客户端已存在且处于活动状态,跳过初始化");
return;
}
// 检查WebSocket端点是否配置
if (!brokerURL.value) {
console.warn("WebSocket连接失败: 未配置WebSocket端点URL");
// 检查 WebSocket 端点是否配置
if (!config.brokerURL.value) {
logWarn("WebSocket 连接失败: 未配置 WebSocket 端点 URL");
return;
}
// 每次连接前重新获取最新令牌不依赖之前的token值
const currentToken = AuthStorage.getAccessToken();
// 检查令牌是否为空,如果为空则不进行连接
if (!currentToken) {
console.warn("WebSocket连接失败授权令牌为空请先登录");
// 每次连接前重新获取最新令牌
const accessToken = AuthStorage.getAccessToken();
if (!accessToken) {
logWarn("WebSocket 连接失败:授权令牌为空,请先登录");
return;
}
// 如果有旧的客户端,先清理
if (client.value) {
// 清理旧客户端
if (stompClient.value) {
try {
client.value.deactivate();
stompClient.value.deactivate();
} catch (error) {
console.warn("清理旧客户端时出错:", error);
logWarn("清理旧客户端时出错:", error);
}
client.value = null;
stompClient.value = null;
}
// 创建 STOMP 客户端
client.value = new Client({
brokerURL: brokerURL.value,
stompClient.value = new Client({
brokerURL: config.brokerURL.value,
connectHeaders: {
Authorization: `Bearer ${currentToken}`,
Authorization: `Bearer ${accessToken}`,
},
debug: options.debug ? console.log : () => {},
reconnectDelay: 0, // 禁用内置重连机制,使用自定义重连
debug: config.debug ? (msg) => console.log("[STOMP]", msg) : () => {},
reconnectDelay: 0, // 禁用内置重连,使用自定义重连逻辑
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
// 设置连接监听器
client.value.onConnect = () => {
isConnected.value = true;
isConnecting = false;
reconnectCount.value = 0;
if (connectionTimeoutTimer) clearTimeout(connectionTimeoutTimer);
if (reconnectTimer) clearTimeout(reconnectTimer);
console.log("WebSocket连接已建立");
// ==================== 事件监听器 ====================
// 连接成功
stompClient.value.onConnect = () => {
connectionState.value = ConnectionState.CONNECTED;
reconnectAttempts.value = 0;
clearAllTimers();
log("✅ WebSocket 连接已建立");
// 自动恢复订阅
restoreSubscriptions();
};
// 设置断开连接监听器
client.value.onDisconnect = () => {
isConnected.value = false;
isConnecting = false;
console.log("WebSocket连接已断开");
// 连接断开
stompClient.value.onDisconnect = () => {
connectionState.value = ConnectionState.DISCONNECTED;
log("❌ WebSocket 连接已断开");
// 清空活动订阅(但保留订阅配置用于恢复)
activeSubscriptions.clear();
// 如果不是手动断开且未达到最大重连次数,则尝试重连
if (!isManualDisconnect && reconnectCount.value < maxReconnectAttempts) {
handleReconnect();
if (!isManualDisconnect && reconnectAttempts.value < config.maxReconnectAttempts) {
scheduleReconnect();
}
};
// 设置 Web Socket 关闭监听器
client.value.onWebSocketClose = (event) => {
isConnected.value = false;
isConnecting = false;
console.log(`WebSocket已关闭: ${event?.code} ${event?.reason}`);
// WebSocket 关闭
stompClient.value.onWebSocketClose = (event) => {
connectionState.value = ConnectionState.DISCONNECTED;
log(`WebSocket 已关闭: code=${event?.code}, reason=${event?.reason}`);
// 如果是手动断开,不重连
// 如果是手动断开,不重连
if (isManualDisconnect) {
console.log("手动断开连接,不进行重连");
log("手动断开连接,不进行重连");
return;
}
// 如果是授权问题导致的关闭,尝试重连
// 对于异常关闭,尝试重连
if (
(event?.code === 1000 || event?.code === 1006 || event?.code === 1008) &&
reconnectCount.value < maxReconnectAttempts
event?.code &&
[1000, 1006, 1008, 1011].includes(event.code) &&
reconnectAttempts.value < config.maxReconnectAttempts
) {
console.log("检测到连接异常关闭,将尝试重连");
// 通过 handleReconnect 统一处理重连,避免重复计数
handleReconnect();
log("检测到连接异常关闭,将尝试重连");
scheduleReconnect();
}
};
// 设置错误监听器
client.value.onStompError = (frame) => {
console.error("STOMP错误:", frame.headers, frame.body);
isConnecting = false;
// STOMP 错误
stompClient.value.onStompError = (frame) => {
logError("STOMP 错误:", frame.headers, frame.body);
connectionState.value = ConnectionState.DISCONNECTED;
// 检查是否是授权错误
if (
const isAuthError =
frame.headers?.message?.includes("Unauthorized") ||
frame.body?.includes("Unauthorized") ||
frame.body?.includes("Token")
) {
console.warn("WebSocket授权错误请检查登录状态");
// 授权错误不进行重连
isManualDisconnect = true;
frame.body?.includes("Token") ||
frame.body?.includes("401");
if (isAuthError) {
logWarn("WebSocket 授权错误,停止重连");
isManualDisconnect = true; // 授权错误不进行重连
}
};
};
/**
* 处理重连逻辑
* 调度重连任务
*/
const handleReconnect = () => {
// 如果已经在连接或手动断开,不重连
if (isConnecting || isManualDisconnect) {
const scheduleReconnect = () => {
// 如果在连接或手动断开,不重连
if (connectionState.value === ConnectionState.CONNECTING || isManualDisconnect) {
return;
}
if (reconnectCount.value >= maxReconnectAttempts) {
console.error(`已达到最大重连次数(${maxReconnectAttempts}),停止重连`);
// 检查是否达到最大重连次数
if (reconnectAttempts.value >= config.maxReconnectAttempts) {
logError(`已达到最大重连次数 (${config.maxReconnectAttempts}),停止重连`);
return;
}
reconnectCount.value++;
console.log(`准备重连(${reconnectCount.value}/${maxReconnectAttempts})...`);
reconnectAttempts.value++;
connectionState.value = ConnectionState.RECONNECTING;
// 使用指数退避策略增加重连间隔
const delay = useExponentialBackoff
? Math.min(reconnectDelay * Math.pow(2, reconnectCount.value - 1), maxReconnectDelay)
: reconnectDelay;
// 计算重连延迟(支持指数退避)
const delay = config.useExponentialBackoff
? Math.min(
config.reconnectDelay * Math.pow(2, reconnectAttempts.value - 1),
config.maxReconnectDelay
)
: config.reconnectDelay;
// 清除之前的计时器
log(`准备重连 (${reconnectAttempts.value}/${config.maxReconnectAttempts}),延迟 ${delay}ms`);
// 清除之前的重连计时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
// 设置重连计时器
reconnectTimer = setTimeout(() => {
if (!isConnected.value && !isManualDisconnect && !isConnecting) {
console.log(`开始重连...`);
if (connectionState.value !== ConnectionState.CONNECTED && !isManualDisconnect) {
log(`开始${reconnectAttempts.value}重连...`);
connect();
}
}, delay);
};
// 监听 brokerURL 的变化,若地址改变则重新初始化
watch(brokerURL, (newURL, oldURL) => {
// 监听 brokerURL 的变化,自动重新初始化
watch(config.brokerURL, (newURL, oldURL) => {
if (newURL !== oldURL) {
console.log(`brokerURL changed from ${oldURL} to ${newURL}`);
// 断开当前连接,重新激活客户端
if (client.value && client.value.connected) {
client.value.deactivate();
log(`WebSocket 端点已更改: ${oldURL} -> ${newURL}`);
// 断开当前连接
if (stompClient.value && stompClient.value.connected) {
stompClient.value.deactivate();
}
brokerURL.value = newURL;
initializeClient(); // 重新初始化客户端
// 重新初始化客户端
initializeClient();
}
});
// 初始化客户端
initializeClient();
// ==================== 公共接口 ====================
/**
* 激活连接(如果已经连接或正在激活则直接返回)
* 建立 WebSocket 连接
*/
const connect = () => {
// 重置手动断开标志
isManualDisconnect = false;
// 检查是否配置WebSocket端点
if (!brokerURL.value) {
console.error("WebSocket连接失败: 未配置WebSocket端点URL");
// 检查是否配置WebSocket 端点
if (!config.brokerURL.value) {
logError("WebSocket 连接失败: 未配置 WebSocket 端点 URL");
return;
}
// 防止重复连接
if (isConnecting) {
console.log("WebSocket正在连接中跳过重复连接请求");
if (connectionState.value === ConnectionState.CONNECTING) {
log("WebSocket 正在连接中,跳过重复连接请求");
return;
}
if (!client.value) {
// 如果客户端不存在,先初始化
if (!stompClient.value) {
initializeClient();
}
if (!client.value) {
console.error("STOMP客户端初始化失败");
if (!stompClient.value) {
logError("STOMP 客户端初始化失败");
return;
}
// 避免重复连接:检查是否已连接
if (client.value.connected) {
console.log("WebSocket已连接,跳过重复连接");
isConnected.value = true;
// 避免重复连接检查是否已连接
if (stompClient.value.connected) {
log("WebSocket 已连接跳过重复连接");
connectionState.value = ConnectionState.CONNECTED;
return;
}
// 设置连接标志
isConnecting = true;
// 设置连接状态
connectionState.value = ConnectionState.CONNECTING;
// 设置连接超时
if (connectionTimeoutTimer) clearTimeout(connectionTimeoutTimer);
if (connectionTimeoutTimer) {
clearTimeout(connectionTimeoutTimer);
}
connectionTimeoutTimer = setTimeout(() => {
if (!isConnected.value && isConnecting) {
console.warn("WebSocket连接超时");
isConnecting = false;
if (!isManualDisconnect && reconnectCount.value < maxReconnectAttempts) {
handleReconnect();
if (connectionState.value === ConnectionState.CONNECTING) {
logWarn("WebSocket 连接超时");
connectionState.value = ConnectionState.DISCONNECTED;
// 超时后尝试重连
if (!isManualDisconnect && reconnectAttempts.value < config.maxReconnectAttempts) {
scheduleReconnect();
}
}
}, connectionTimeout);
}, config.connectionTimeout);
try {
client.value.activate();
console.log("正在建立WebSocket连接...");
stompClient.value.activate();
log("正在建立 WebSocket 连接...");
} catch (error) {
console.error("激活WebSocket连接失败:", error);
isConnecting = false;
logError("激活 WebSocket 连接失败:", error);
connectionState.value = ConnectionState.DISCONNECTED;
}
};
/**
* 执行订阅操作(内部方法)
*/
const performSubscribe = (destination: string, callback: (message: IMessage) => void): string => {
if (!stompClient.value || !stompClient.value.connected) {
logWarn(`尝试订阅 ${destination} 失败: 客户端未连接`);
return "";
}
try {
const subscription = stompClient.value.subscribe(destination, callback);
const subscriptionId = subscription.id;
activeSubscriptions.set(subscriptionId, subscription);
log(`✓ 订阅成功: ${destination} (ID: ${subscriptionId})`);
return subscriptionId;
} catch (error) {
logError(`订阅 ${destination} 失败:`, error);
return "";
}
};
/**
* 订阅指定主题
* @param destination 目标主题地址
*
* @param destination 目标主题地址(如:/topic/message
* @param callback 接收到消息时的回调函数
* @returns 返回订阅 id,用于后续取消订阅
* @returns 订阅 ID,用于后续取消订阅
*/
const subscribe = (destination: string, callback: (_message: IMessage) => void): string => {
if (!client.value || !client.value.connected) {
console.warn(`尝试订阅 ${destination} 失败: 客户端未连接`);
return "";
const subscribe = (destination: string, callback: (message: IMessage) => void): string => {
// 保存订阅配置到注册表,用于断线重连后自动恢复
subscriptionRegistry.set(destination, { destination, callback });
// 如果已连接,立即订阅
if (stompClient.value?.connected) {
return performSubscribe(destination, callback);
}
try {
const subscription = client.value.subscribe(destination, callback);
const subscriptionId = subscription.id;
subscriptions.set(subscriptionId, subscription);
console.log(`订阅成功: ${destination}, ID: ${subscriptionId}`);
return subscriptionId;
} catch (error) {
console.error(`订阅 ${destination} 失败:`, error);
return "";
}
log(`暂存订阅配置: ${destination},将在连接建立后自动订阅`);
return "";
};
/**
* 取消订阅
* @param subscriptionId 订阅 id
*
* @param subscriptionId 订阅 ID由 subscribe 方法返回)
*/
const unsubscribe = (subscriptionId: string) => {
const subscription = subscriptions.get(subscriptionId);
const subscription = activeSubscriptions.get(subscriptionId);
if (subscription) {
subscription.unsubscribe();
subscriptions.delete(subscriptionId);
console.log(`已取消订阅: ${subscriptionId}`);
try {
subscription.unsubscribe();
activeSubscriptions.delete(subscriptionId);
log(`✓ 已取消订阅: ${subscriptionId}`);
} catch (error) {
logWarn(`取消订阅 ${subscriptionId} 时出错:`, error);
}
}
};
/**
* 断开WebSocket连接
* 取消指定主题的订阅(从注册表中移除)
*
* @param destination 主题地址
*/
const disconnect = () => {
const unsubscribeDestination = (destination: string) => {
// 从注册表中移除
subscriptionRegistry.delete(destination);
// 取消所有匹配该主题的活动订阅
for (const [id, subscription] of activeSubscriptions.entries()) {
// 注意STOMP 的 subscription 对象没有直接暴露 destination
// 这里简化处理,实际使用时可能需要额外维护 id -> destination 的映射
try {
subscription.unsubscribe();
activeSubscriptions.delete(id);
} catch (error) {
logWarn(`取消订阅 ${id} 时出错:`, error);
}
}
log(`✓ 已移除主题订阅配置: ${destination}`);
};
/**
* 断开 WebSocket 连接
*
* @param clearSubscriptions 是否清除订阅注册表(默认为 true
*/
const disconnect = (clearSubscriptions = true) => {
// 设置手动断开标志
isManualDisconnect = true;
// 清除所有时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
// 清除所有时器
clearAllTimers();
if (connectionTimeoutTimer) {
clearTimeout(connectionTimeoutTimer);
connectionTimeoutTimer = null;
}
// 清除所有订阅
for (const [id, subscription] of subscriptions.entries()) {
// 取消所有活动订阅
for (const [id, subscription] of activeSubscriptions.entries()) {
try {
subscription.unsubscribe();
} catch (error) {
console.warn(`取消订阅 ${id} 时出错:`, error);
logWarn(`取消订阅 ${id} 时出错:`, error);
}
}
subscriptions.clear();
activeSubscriptions.clear();
// 可选:清除订阅注册表
if (clearSubscriptions) {
subscriptionRegistry.clear();
log("已清除所有订阅配置");
}
// 断开连接
if (client.value) {
if (stompClient.value) {
try {
if (client.value.connected || client.value.active) {
client.value.deactivate();
console.log("WebSocket连接已主动断开");
if (stompClient.value.connected || stompClient.value.active) {
stompClient.value.deactivate();
log("WebSocket 连接已主动断开");
}
} catch (error) {
console.error("断开WebSocket连接时出错:", error);
logError("断开 WebSocket 连接时出错:", error);
}
client.value = null;
stompClient.value = null;
}
isConnected.value = false;
isConnecting = false;
reconnectCount.value = 0;
connectionState.value = ConnectionState.DISCONNECTED;
reconnectAttempts.value = 0;
};
// ==================== 返回公共接口 ====================
return {
// 状态
connectionState: readonly(connectionState),
isConnected,
reconnectAttempts: readonly(reconnectAttempts),
// 连接管理
connect,
disconnect,
// 订阅管理
subscribe,
unsubscribe,
disconnect,
unsubscribeDestination,
// 统计信息
getActiveSubscriptionCount: () => activeSubscriptions.size,
getRegisteredSubscriptionCount: () => subscriptionRegistry.size,
};
}