diff --git a/src/composables/websocket/useDictSync.ts b/src/composables/websocket/useDictSync.ts index b0823620..1b8a569f 100644 --- a/src/composables/websocket/useDictSync.ts +++ b/src/composables/websocket/useDictSync.ts @@ -2,53 +2,145 @@ import { useDictStoreHook } from "@/store/modules/dict-store"; import { useStomp } from "./useStomp"; import type { IMessage } from "@stomp/stompjs"; -// 字典消息类型 -export interface DictMessage { +/** + * 字典变更消息结构 + */ +export interface DictChangeMessage { + /** 字典编码 */ dictCode: string; + /** 时间戳 */ timestamp: number; } -// 字典事件回调类型 -export type DictMessageCallback = (_message: DictMessage) => void; - -// 全局单例实例 -let instance: ReturnType | null = null; +/** + * 字典变更事件回调函数类型 + */ +export type DictChangeCallback = (message: DictChangeMessage) => void; /** - * 创建字典同步组合式函数 - * 负责监听后端字典变更并同步到前端 + * 全局单例实例 */ -function createDictSyncHook() { +let singletonInstance: ReturnType | null = null; + +/** + * 创建字典同步组合式函数(内部工厂函数) + */ +function createDictSyncComposable() { const dictStore = useDictStoreHook(); - // 使用现有的useStomp,配置适合字典场景的重连参数 - const { isConnected, connect, subscribe, unsubscribe, disconnect } = useStomp({ - reconnectDelay: 20000, // 字典更新重连时间 - connectionTimeout: 15000, // 连接超时阈值 - useExponentialBackoff: false, // 使用固定间隔重连策略 - maxReconnectAttempts: 3, // 最多重连3次 + // 使用优化后的 useStomp + const stomp = useStomp({ + reconnectDelay: 20000, + connectionTimeout: 15000, + useExponentialBackoff: false, + maxReconnectAttempts: 3, + autoRestoreSubscriptions: true, // 自动恢复订阅 + debug: false, }); - // 存储订阅ID - const subscriptionIds = ref([]); - - // 已订阅的主题 - const subscribedTopics = ref>(new Set()); + // 字典主题地址 + const DICT_TOPIC = "/topic/dict"; // 消息回调函数列表 - const messageCallbacks = ref([]); + const messageCallbacks = ref([]); - // 重试定时器 - let retryTimer: ReturnType | null = null; + // 订阅 ID(用于取消订阅) + let subscriptionId: string | null = null; /** - * 注册字典消息回调 - * @param callback 回调函数 + * 处理字典变更事件 */ - const onDictMessage = (callback: DictMessageCallback) => { + const handleDictChangeMessage = (message: IMessage) => { + if (!message.body) { + return; + } + + try { + const data = JSON.parse(message.body) as DictChangeMessage; + const { dictCode } = data; + + if (!dictCode) { + console.warn("[DictSync] 收到无效的字典变更消息:缺少 dictCode"); + return; + } + + console.log(`[DictSync] 字典 "${dictCode}" 已更新,清除本地缓存`); + + // 清除缓存,等待按需加载 + dictStore.removeDictItem(dictCode); + + // 执行所有注册的回调函数 + messageCallbacks.value.forEach((callback) => { + try { + callback(data); + } catch (error) { + console.error("[DictSync] 回调函数执行失败:", error); + } + }); + } catch (error) { + console.error("[DictSync] 解析字典变更消息失败:", error); + } + }; + + /** + * 初始化 WebSocket 连接并订阅字典主题 + */ + const initialize = () => { + // 检查是否配置了 WebSocket 端点 + const wsEndpoint = import.meta.env.VITE_APP_WS_ENDPOINT; + if (!wsEndpoint) { + console.log("[DictSync] 未配置 WebSocket 端点,跳过字典同步功能"); + return; + } + + console.log("[DictSync] 初始化字典同步服务..."); + + // 建立 WebSocket 连接 + stomp.connect(); + + // 订阅字典主题(useStomp 会自动处理重连后的订阅恢复) + subscriptionId = stomp.subscribe(DICT_TOPIC, handleDictChangeMessage); + + if (subscriptionId) { + console.log(`[DictSync] 已订阅字典主题: ${DICT_TOPIC}`); + } else { + console.log(`[DictSync] 暂存字典主题订阅,等待连接建立后自动订阅`); + } + }; + + /** + * 关闭 WebSocket 连接并清理资源 + */ + const cleanup = () => { + console.log("[DictSync] 清理字典同步服务..."); + + // 取消订阅(如果有的话) + if (subscriptionId) { + stomp.unsubscribe(subscriptionId); + subscriptionId = null; + } + + // 也可以通过主题地址取消订阅 + stomp.unsubscribeDestination(DICT_TOPIC); + + // 断开连接 + stomp.disconnect(); + + // 清空回调列表 + messageCallbacks.value = []; + }; + + /** + * 注册字典变更回调函数 + * + * @param callback 回调函数 + * @returns 返回一个取消注册的函数 + */ + const onDictChange = (callback: DictChangeCallback) => { messageCallbacks.value.push(callback); + + // 返回取消注册的函数 return () => { - // 返回取消注册的函数 const index = messageCallbacks.value.indexOf(callback); if (index !== -1) { messageCallbacks.value.splice(index, 1); @@ -56,163 +148,48 @@ function createDictSyncHook() { }; }; - /** - * 初始化WebSocket - */ - const initWebSocket = async () => { - try { - // 检查是否配置了WebSocket端点 - const wsEndpoint = import.meta.env.VITE_APP_WS_ENDPOINT; - if (!wsEndpoint) { - console.log("[DictSync] 未配置WebSocket端点,跳过连接"); - return; - } - - // 连接WebSocket - connect(); - - // 设置字典订阅 - setupDictSubscription(); - } catch (error) { - console.error("[DictSync] 初始化失败:", error); - } - }; - - /** - * 关闭WebSocket - */ - const closeWebSocket = () => { - // 清理重试定时器 - if (retryTimer) { - clearTimeout(retryTimer); - retryTimer = null; - } - - // 取消所有订阅 - subscriptionIds.value.forEach((id: string) => { - unsubscribe(id); - }); - subscriptionIds.value = []; - subscribedTopics.value.clear(); - - // 断开连接 - disconnect(); - }; - - /** - * 设置字典订阅 - */ - const setupDictSubscription = () => { - const topic = "/topic/dict"; - - // 防止重复订阅 - if (subscribedTopics.value.has(topic)) { - console.log(`[DictSync] 跳过重复订阅: ${topic}`); - return; - } - - console.log(`[DictSync] 开始尝试订阅字典主题: ${topic}`); - - // 使用简化的重试逻辑,依赖useStomp的连接管理 - const attemptSubscribe = () => { - if (!isConnected.value) { - console.log("[DictSync] 等待WebSocket连接建立..."); - // 清理之前的定时器,防止重复 - if (retryTimer) { - clearTimeout(retryTimer); - } - // 10秒后再次尝试 - retryTimer = setTimeout(() => { - retryTimer = null; - attemptSubscribe(); - }, 10000); - return; - } - - // 清理重试定时器 - if (retryTimer) { - clearTimeout(retryTimer); - retryTimer = null; - } - - // 检查是否已订阅 - if (subscribedTopics.value.has(topic)) { - return; - } - - console.log(`[DictSync] 连接已建立,开始订阅: ${topic}`); - - // 订阅字典更新 - const subId = subscribe(topic, (message: IMessage) => { - handleDictEvent(message); - }); - - if (subId) { - subscriptionIds.value.push(subId); - subscribedTopics.value.add(topic); - console.log(`[DictSync] 字典主题订阅成功: ${topic}`); - } else { - console.warn(`[DictSync] 字典主题订阅失败: ${topic}`); - } - }; - - // 开始尝试订阅 - attemptSubscribe(); - }; - - /** - * 处理字典事件 - * @param message STOMP消息 - */ - const handleDictEvent = (message: IMessage) => { - if (!message.body) return; - - try { - // 记录接收到的消息 - console.log(`[DictSync] 收到字典更新消息: ${message.body}`); - - // 尝试解析消息 - const parsedData = JSON.parse(message.body) as DictMessage; - const dictCode = parsedData.dictCode; - - if (!dictCode) return; - - // 清除缓存,等待按需加载 - dictStore.removeDictItem(dictCode); - console.log(`[DictSync] 字典缓存已清除: ${dictCode}`); - - // 调用所有注册的回调函数 - messageCallbacks.value.forEach((callback: DictMessageCallback) => { - try { - callback(parsedData); - } catch (callbackError) { - console.error("[DictSync] 回调执行失败:", callbackError); - } - }); - - // 显示提示消息 - console.info(`[DictSync] 字典 ${dictCode} 已变更,将在下次使用时自动加载`); - } catch (error) { - console.error("[DictSync] 解析消息失败:", error); - } - }; - return { - isConnected, - initWebSocket, - closeWebSocket, - handleDictEvent, - onDictMessage, + // 状态 + isConnected: stomp.isConnected, + connectionState: stomp.connectionState, + + // 方法 + initialize, + cleanup, + onDictChange, + + // 用于测试和调试 + handleDictChangeMessage, }; } /** - * 字典同步组合式函数 - * 用于监听后端字典变更并同步到前端 + * 字典同步组合式函数(单例模式) + * + * 用于监听后端字典变更并自动同步到前端缓存 + * + * @example + * ```ts + * const dictSync = useDictSync(); + * + * // 初始化(在应用启动时调用) + * dictSync.initialize(); + * + * // 注册回调 + * const unsubscribe = dictSync.onDictChange((message) => { + * console.log('字典已更新:', message.dictCode); + * }); + * + * // 取消注册 + * unsubscribe(); + * + * // 清理(在应用退出时调用) + * dictSync.cleanup(); + * ``` */ export function useDictSync() { - if (!instance) { - instance = createDictSyncHook(); + if (!singletonInstance) { + singletonInstance = createDictSyncComposable(); } - return instance; + return singletonInstance; } diff --git a/src/composables/websocket/useOnlineCount.ts b/src/composables/websocket/useOnlineCount.ts index ae4e46bb..b1cdf622 100644 --- a/src/composables/websocket/useOnlineCount.ts +++ b/src/composables/websocket/useOnlineCount.ts @@ -2,212 +2,210 @@ import { ref, watch, onMounted, onUnmounted, getCurrentInstance } from "vue"; import { useStomp } from "./useStomp"; import { registerWebSocketInstance } from "@/plugins/websocket"; import { AuthStorage } from "@/utils/auth"; -import { ElMessage } from "element-plus"; - -// 全局单例实例 -let globalInstance: ReturnType | null = null; /** - * 创建在线用户计数的核心逻辑 + * 在线用户数量消息结构 */ -function createOnlineCountHook() { - // 在线用户数量 - const onlineUserCount = ref(0); +interface OnlineCountMessage { + count?: number; + timestamp?: number; +} - // 最后更新时间戳 +/** + * 全局单例实例 + */ +let globalInstance: ReturnType | null = null; + +/** + * 创建在线用户计数组合式函数(内部工厂函数) + */ +function createOnlineCountComposable() { + // ==================== 状态管理 ==================== + const onlineUserCount = ref(0); const lastUpdateTime = ref(0); - // 连接状态 - const isConnected = ref(false); - - // 连接正在尝试中 - const isConnecting = ref(false); - - // 使用Stomp客户端 - 配置使用指数退避策略 - const stompInstance = useStomp({ - reconnectDelay: 15000, // 重连基础延迟 - maxReconnectAttempts: 3, // 重连次数上限 - connectionTimeout: 10000, // 连接超时 - useExponentialBackoff: true, // 启用指数退避 + // ==================== WebSocket 客户端 ==================== + const stomp = useStomp({ + reconnectDelay: 15000, + maxReconnectAttempts: 3, + connectionTimeout: 10000, + useExponentialBackoff: true, + autoRestoreSubscriptions: true, // 自动恢复订阅 + debug: false, }); - const { - connect, - subscribe, - unsubscribe, - disconnect, - isConnected: stompConnected, - } = stompInstance; + // 在线用户计数主题 + const ONLINE_COUNT_TOPIC = "/topic/online-count"; + + // 订阅 ID + let subscriptionId: string | null = null; // 注册到全局实例管理器 - registerWebSocketInstance("onlineCount", stompInstance); + registerWebSocketInstance("onlineCount", stomp); - // 订阅ID - let subscriptionId = ""; + /** + * 处理在线用户数量消息 + */ + const handleOnlineCountMessage = (message: any) => { + try { + const data = message.body; + const jsonData = JSON.parse(data) as OnlineCountMessage; - // 连接超时计时器 - let connectionTimeoutTimer: any = null; + // 支持两种消息格式 + // 1. 直接是数字: 42 + // 2. 对象格式: { count: 42, timestamp: 1234567890 } + const count = typeof jsonData === "number" ? jsonData : jsonData.count; - // 监听Stomp连接状态 - watch(stompConnected, (connected) => { - if (connected && isConnecting.value) { - isConnected.value = true; - isConnecting.value = false; - - // 一旦连接成功,立即订阅主题 - subscribeToOnlineCount(); - console.log("[useOnlineCount] WebSocket连接成功,已订阅在线用户计数主题"); + if (count !== undefined && !isNaN(count)) { + onlineUserCount.value = count; + lastUpdateTime.value = Date.now(); + console.log(`[useOnlineCount] 在线用户数更新: ${count}`); + } else { + console.warn("[useOnlineCount] 收到无效的在线用户数:", data); + } + } catch (error) { + console.error("[useOnlineCount] 解析在线用户数失败:", error); } - }); + }; /** * 订阅在线用户计数主题 */ const subscribeToOnlineCount = () => { - if (!stompConnected.value) { - // 10秒后重试订阅 - setTimeout(subscribeToOnlineCount, 10000); + if (subscriptionId) { + console.log("[useOnlineCount] 已存在订阅,跳过"); return; } - // 如果已经订阅,先取消订阅 + // 订阅在线用户计数主题(useStomp 会处理重连后的订阅恢复) + subscriptionId = stomp.subscribe(ONLINE_COUNT_TOPIC, handleOnlineCountMessage); + if (subscriptionId) { - unsubscribe(subscriptionId); + console.log(`[useOnlineCount] 已订阅主题: ${ONLINE_COUNT_TOPIC}`); + } else { + console.log(`[useOnlineCount] 暂存订阅配置,等待连接建立后自动订阅`); } - - // 订阅在线用户计数主题 - subscriptionId = subscribe("/topic/online-count", (message) => { - try { - const data = message.body; - - const jsonData = JSON.parse(data); - const count = typeof jsonData === "number" ? jsonData : jsonData.count; - - if (!isNaN(count)) { - onlineUserCount.value = count; - lastUpdateTime.value = Date.now(); - } - } catch (error) { - console.error("[useOnlineCount] 解析在线用户数量失败:", error); - } - }); }; /** - * 初始化WebSocket连接并订阅在线用户主题 + * 初始化 WebSocket 连接并订阅在线用户主题 */ - const initWebSocket = () => { - if (isConnecting.value) return; - - // 检查WebSocket端点是否配置 + const initialize = () => { + // 检查 WebSocket 端点是否配置 const wsEndpoint = import.meta.env.VITE_APP_WS_ENDPOINT; if (!wsEndpoint) { - console.log("[useOnlineCount] 未配置WebSocket端点(VITE_APP_WS_ENDPOINT),跳过WebSocket连接"); + console.log("[useOnlineCount] 未配置 WebSocket 端点,跳过初始化"); return; } - // 使用 AuthStorage.getAccessToken() 获取令牌,确保获取到最新的 + // 检查令牌有效性 const accessToken = AuthStorage.getAccessToken(); if (!accessToken) { - console.log("[useOnlineCount] 没有检测到有效令牌,不尝试WebSocket连接"); + console.log("[useOnlineCount] 未检测到有效令牌,跳过初始化"); return; } - isConnecting.value = true; - console.log("[useOnlineCount] 开始建立WebSocket连接..."); + console.log("[useOnlineCount] 初始化在线用户计数服务..."); - // 连接WebSocket - connect(); + // 建立 WebSocket 连接 + stomp.connect(); - // 设置连接超时显示UI提示 - clearTimeout(connectionTimeoutTimer); - connectionTimeoutTimer = setTimeout(() => { - if (!isConnected.value) { - console.warn("[useOnlineCount] WebSocket连接超时,将自动尝试重连"); - ElMessage.warning("正在尝试连接服务器,请稍候..."); - - // 超时后尝试重新连接 - closeWebSocket(); - setTimeout(() => { - // 再次检查令牌有效性 - if (AuthStorage.getAccessToken()) { - initWebSocket(); - } else { - console.log("[useOnlineCount] 令牌无效,放弃重连"); - } - }, 3000); - } - }, 10000); // 较长的UI提示超时 - - // 监听连接状态变化,连接成功后清除超时计时器 - const unwatch = watch(stompConnected, (connected) => { - if (connected) { - clearTimeout(connectionTimeoutTimer); - unwatch(); - } - }); + // 订阅主题 + subscribeToOnlineCount(); }; /** - * 关闭WebSocket连接 + * 关闭 WebSocket 连接并清理资源 */ - const closeWebSocket = () => { + const cleanup = () => { + console.log("[useOnlineCount] 清理在线用户计数服务..."); + + // 取消订阅 if (subscriptionId) { - unsubscribe(subscriptionId); - subscriptionId = ""; + stomp.unsubscribe(subscriptionId); + subscriptionId = null; } - // 清除连接超时计时器 - if (connectionTimeoutTimer) { - clearTimeout(connectionTimeoutTimer); - connectionTimeoutTimer = null; - } + // 也可以通过主题地址取消订阅 + stomp.unsubscribeDestination(ONLINE_COUNT_TOPIC); - disconnect(); - isConnected.value = false; - isConnecting.value = false; + // 断开连接 + stomp.disconnect(); + + // 重置状态 + onlineUserCount.value = 0; + lastUpdateTime.value = 0; }; + // 监听连接状态变化 + watch( + stomp.isConnected, + (connected) => { + if (connected) { + console.log("[useOnlineCount] WebSocket 已连接"); + } else { + console.log("[useOnlineCount] WebSocket 已断开"); + } + }, + { immediate: false } + ); + return { - onlineUserCount, - lastUpdateTime, - isConnected, - isConnecting, - initWebSocket, - closeWebSocket, + // 状态 + onlineUserCount: readonly(onlineUserCount), + lastUpdateTime: readonly(lastUpdateTime), + isConnected: stomp.isConnected, + connectionState: stomp.connectionState, + + // 方法 + initialize, + cleanup, }; } /** - * 在线用户计数组合式函数 - * 使用单例模式,避免重复创建 WebSocket 连接 + * 在线用户计数组合式函数(单例模式) + * + * 用于实时显示系统在线用户数量 + * * @param options 配置选项 * @param options.autoInit 是否在组件挂载时自动初始化(默认 true) + * + * @example + * ```ts + * // 在组件中使用 + * const { onlineUserCount, isConnected } = useOnlineCount(); + * + * // 手动控制初始化 + * const { onlineUserCount, initialize, cleanup } = useOnlineCount({ autoInit: false }); + * onMounted(() => initialize()); + * onUnmounted(() => cleanup()); + * ``` */ export function useOnlineCount(options: { autoInit?: boolean } = {}) { const { autoInit = true } = options; + // 获取或创建单例实例 if (!globalInstance) { - globalInstance = createOnlineCountHook(); + globalInstance = createOnlineCountComposable(); } - // 只有在组件上下文中且 autoInit 为 true 时才使用生命周期钩子 - if (autoInit && getCurrentInstance()) { - // 组件挂载时检查是否需要初始化WebSocket + // 只在组件上下文中且 autoInit 为 true 时使用生命周期钩子 + const instance = getCurrentInstance(); + if (autoInit && instance) { onMounted(() => { - // 只有在未连接且未连接中时才尝试初始化 - if (!globalInstance!.isConnected.value && !globalInstance!.isConnecting.value) { - console.log("[useOnlineCount] 组件挂载,尝试初始化WebSocket连接"); - globalInstance!.initWebSocket(); + // 只有在未连接时才尝试初始化 + if (!globalInstance!.isConnected.value) { + console.log("[useOnlineCount] 组件挂载,初始化 WebSocket 连接"); + globalInstance!.initialize(); } else { - console.log("[useOnlineCount] WebSocket已连接或正在连接,跳过初始化"); + console.log("[useOnlineCount] WebSocket 已连接,跳过初始化"); } }); - // 组件卸载时不关闭连接,保持全局连接 + // 注意:不在卸载时关闭连接,保持全局连接 onUnmounted(() => { - // 不关闭连接,让其他组件继续使用 - console.log("[useOnlineCount] Component unmounted, keeping WebSocket connection"); + console.log("[useOnlineCount] 组件卸载(保持 WebSocket 连接)"); }); } diff --git a/src/composables/websocket/useStomp.ts b/src/composables/websocket/useStomp.ts index 419b8400..88f42a3a 100644 --- a/src/composables/websocket/useStomp.ts +++ b/src/composables/websocket/useStomp.ts @@ -6,363 +6,525 @@ export interface UseStompOptions { brokerURL?: string; /** 用于鉴权的 token,不传时使用 getAccessToken() 的返回值 */ token?: string; - /** 重连延迟,单位毫秒,默认为 8000 */ + /** 重连延迟,单位毫秒,默认为 15000 */ reconnectDelay?: number; /** 连接超时时间,单位毫秒,默认为 10000 */ connectionTimeout?: number; /** 是否开启指数退避重连策略 */ useExponentialBackoff?: boolean; - /** 最大重连次数,默认为 5 */ + /** 最大重连次数,默认为 3 */ maxReconnectAttempts?: number; /** 最大重连延迟,单位毫秒,默认为 60000 */ maxReconnectDelay?: number; /** 是否开启调试日志 */ debug?: boolean; + /** 是否在重连时自动恢复订阅,默认为 true */ + autoRestoreSubscriptions?: boolean; } /** - * STOMP WebSocket连接组合式函数 - * 用于管理WebSocket连接的建立、断开、重连和消息订阅 + * 订阅配置信息 + */ +interface SubscriptionConfig { + destination: string; + callback: (message: IMessage) => void; +} + +/** + * 连接状态枚举 + */ +enum ConnectionState { + DISCONNECTED = "DISCONNECTED", + CONNECTING = "CONNECTING", + CONNECTED = "CONNECTED", + RECONNECTING = "RECONNECTING", +} + +/** + * STOMP WebSocket 连接管理组合式函数 + * + * 核心功能: + * - 自动连接管理(连接、断开、重连) + * - 订阅管理(订阅、取消订阅、自动恢复) + * - 心跳检测 + * - Token 自动刷新 + * + * @param options 配置选项 + * @returns STOMP 客户端操作接口 */ export function useStomp(options: UseStompOptions = {}) { - // 默认值:brokerURL 从环境变量中获取,token 从 getAccessToken() 获取 + // ==================== 配置初始化 ==================== const defaultBrokerURL = import.meta.env.VITE_APP_WS_ENDPOINT || ""; - const brokerURL = ref(options.brokerURL ?? defaultBrokerURL); - // 默认配置参数 - const reconnectDelay = options.reconnectDelay ?? 15000; // 默认15秒重连间隔 - const connectionTimeout = options.connectionTimeout ?? 10000; - const useExponentialBackoff = options.useExponentialBackoff ?? false; - const maxReconnectAttempts = options.maxReconnectAttempts ?? 3; // 最多重连3次 - const maxReconnectDelay = options.maxReconnectDelay ?? 60000; + const config = { + brokerURL: ref(options.brokerURL ?? defaultBrokerURL), + reconnectDelay: options.reconnectDelay ?? 15000, + connectionTimeout: options.connectionTimeout ?? 10000, + useExponentialBackoff: options.useExponentialBackoff ?? false, + maxReconnectAttempts: options.maxReconnectAttempts ?? 3, + maxReconnectDelay: options.maxReconnectDelay ?? 60000, + autoRestoreSubscriptions: options.autoRestoreSubscriptions ?? true, + debug: options.debug ?? false, + }; - // 连接状态标记 - const isConnected = ref(false); - // 重连尝试次数 - const reconnectCount = ref(0); - // 重连计时器 + // ==================== 状态管理 ==================== + const connectionState = ref(ConnectionState.DISCONNECTED); + const isConnected = computed(() => connectionState.value === ConnectionState.CONNECTED); + const reconnectAttempts = ref(0); + + // ==================== 定时器管理 ==================== let reconnectTimer: ReturnType | null = null; - // 连接超时计时器 let connectionTimeoutTimer: ReturnType | null = null; - // 存储所有订阅 - const subscriptions = new Map(); - // 用于保存 STOMP 客户端的实例 - const client = ref(null); - // 防止重复连接的标志 - let isConnecting = false; + // ==================== 订阅管理 ==================== + // 活动订阅:存储当前 STOMP 订阅对象 + const activeSubscriptions = new Map(); + // 订阅配置注册表:用于自动恢复订阅 + const subscriptionRegistry = new Map(); + + // ==================== 客户端实例 ==================== + const stompClient = ref(null); let isManualDisconnect = false; + // ==================== 工具函数 ==================== + + /** + * 清理所有定时器 + */ + const clearAllTimers = () => { + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + if (connectionTimeoutTimer) { + clearTimeout(connectionTimeoutTimer); + connectionTimeoutTimer = null; + } + }; + + /** + * 日志输出(支持调试模式控制) + */ + const log = (...args: any[]) => { + if (config.debug) { + console.log("[useStomp]", ...args); + } + }; + + const logWarn = (...args: any[]) => { + console.warn("[useStomp]", ...args); + }; + + const logError = (...args: any[]) => { + console.error("[useStomp]", ...args); + }; + + /** + * 恢复所有订阅 + */ + const restoreSubscriptions = () => { + if (!config.autoRestoreSubscriptions || subscriptionRegistry.size === 0) { + return; + } + + log(`开始恢复 ${subscriptionRegistry.size} 个订阅...`); + + for (const [destination, subscriptionConfig] of subscriptionRegistry.entries()) { + try { + performSubscribe(destination, subscriptionConfig.callback); + } catch (error) { + logError(`恢复订阅 ${destination} 失败:`, error); + } + } + }; + /** * 初始化 STOMP 客户端 */ const initializeClient = () => { - // 如果客户端已存在且正在连接或已连接,直接返回 - if (client.value && (client.value.active || client.value.connected)) { - console.log("STOMP客户端已存在且处于活动状态,跳过初始化"); + // 如果客户端已存在且处于活动状态,直接返回 + if (stompClient.value && (stompClient.value.active || stompClient.value.connected)) { + log("STOMP 客户端已存在且处于活动状态,跳过初始化"); return; } - // 检查WebSocket端点是否配置 - if (!brokerURL.value) { - console.warn("WebSocket连接失败: 未配置WebSocket端点URL"); + // 检查 WebSocket 端点是否配置 + if (!config.brokerURL.value) { + logWarn("WebSocket 连接失败: 未配置 WebSocket 端点 URL"); return; } - // 每次连接前重新获取最新令牌,不依赖之前的token值 - const currentToken = AuthStorage.getAccessToken(); - - // 检查令牌是否为空,如果为空则不进行连接 - if (!currentToken) { - console.warn("WebSocket连接失败:授权令牌为空,请先登录"); + // 每次连接前重新获取最新令牌 + const accessToken = AuthStorage.getAccessToken(); + if (!accessToken) { + logWarn("WebSocket 连接失败:授权令牌为空,请先登录"); return; } - // 如果有旧的客户端,先清理 - if (client.value) { + // 清理旧客户端 + if (stompClient.value) { try { - client.value.deactivate(); + stompClient.value.deactivate(); } catch (error) { - console.warn("清理旧客户端时出错:", error); + logWarn("清理旧客户端时出错:", error); } - client.value = null; + stompClient.value = null; } // 创建 STOMP 客户端 - client.value = new Client({ - brokerURL: brokerURL.value, + stompClient.value = new Client({ + brokerURL: config.brokerURL.value, connectHeaders: { - Authorization: `Bearer ${currentToken}`, + Authorization: `Bearer ${accessToken}`, }, - debug: options.debug ? console.log : () => {}, - reconnectDelay: 0, // 禁用内置重连机制,使用自定义重连 + debug: config.debug ? (msg) => console.log("[STOMP]", msg) : () => {}, + reconnectDelay: 0, // 禁用内置重连,使用自定义重连逻辑 heartbeatIncoming: 4000, heartbeatOutgoing: 4000, }); - // 设置连接监听器 - client.value.onConnect = () => { - isConnected.value = true; - isConnecting = false; - reconnectCount.value = 0; - if (connectionTimeoutTimer) clearTimeout(connectionTimeoutTimer); - if (reconnectTimer) clearTimeout(reconnectTimer); - console.log("WebSocket连接已建立"); + // ==================== 事件监听器 ==================== + + // 连接成功 + stompClient.value.onConnect = () => { + connectionState.value = ConnectionState.CONNECTED; + reconnectAttempts.value = 0; + clearAllTimers(); + + log("✅ WebSocket 连接已建立"); + + // 自动恢复订阅 + restoreSubscriptions(); }; - // 设置断开连接监听器 - client.value.onDisconnect = () => { - isConnected.value = false; - isConnecting = false; - console.log("WebSocket连接已断开"); + // 连接断开 + stompClient.value.onDisconnect = () => { + connectionState.value = ConnectionState.DISCONNECTED; + log("❌ WebSocket 连接已断开"); + + // 清空活动订阅(但保留订阅配置用于恢复) + activeSubscriptions.clear(); // 如果不是手动断开且未达到最大重连次数,则尝试重连 - if (!isManualDisconnect && reconnectCount.value < maxReconnectAttempts) { - handleReconnect(); + if (!isManualDisconnect && reconnectAttempts.value < config.maxReconnectAttempts) { + scheduleReconnect(); } }; - // 设置 Web Socket 关闭监听器 - client.value.onWebSocketClose = (event) => { - isConnected.value = false; - isConnecting = false; - console.log(`WebSocket已关闭: ${event?.code} ${event?.reason}`); + // WebSocket 关闭 + stompClient.value.onWebSocketClose = (event) => { + connectionState.value = ConnectionState.DISCONNECTED; + log(`WebSocket 已关闭: code=${event?.code}, reason=${event?.reason}`); - // 如果是手动断开,不要重连 + // 如果是手动断开,不重连 if (isManualDisconnect) { - console.log("手动断开连接,不进行重连"); + log("手动断开连接,不进行重连"); return; } - // 如果是授权问题导致的关闭,尝试重连 + // 对于异常关闭,尝试重连 if ( - (event?.code === 1000 || event?.code === 1006 || event?.code === 1008) && - reconnectCount.value < maxReconnectAttempts + event?.code && + [1000, 1006, 1008, 1011].includes(event.code) && + reconnectAttempts.value < config.maxReconnectAttempts ) { - console.log("检测到连接异常关闭,将尝试重连"); - - // 通过 handleReconnect 统一处理重连,避免重复计数 - handleReconnect(); + log("检测到连接异常关闭,将尝试重连"); + scheduleReconnect(); } }; - // 设置错误监听器 - client.value.onStompError = (frame) => { - console.error("STOMP错误:", frame.headers, frame.body); - isConnecting = false; + // STOMP 错误 + stompClient.value.onStompError = (frame) => { + logError("STOMP 错误:", frame.headers, frame.body); + connectionState.value = ConnectionState.DISCONNECTED; // 检查是否是授权错误 - if ( + const isAuthError = frame.headers?.message?.includes("Unauthorized") || frame.body?.includes("Unauthorized") || - frame.body?.includes("Token") - ) { - console.warn("WebSocket授权错误,请检查登录状态"); - // 授权错误不进行重连 - isManualDisconnect = true; + frame.body?.includes("Token") || + frame.body?.includes("401"); + + if (isAuthError) { + logWarn("WebSocket 授权错误,停止重连"); + isManualDisconnect = true; // 授权错误不进行重连 } }; }; /** - * 处理重连逻辑 + * 调度重连任务 */ - const handleReconnect = () => { - // 如果已经在连接中或手动断开,不重连 - if (isConnecting || isManualDisconnect) { + const scheduleReconnect = () => { + // 如果正在连接或手动断开,不重连 + if (connectionState.value === ConnectionState.CONNECTING || isManualDisconnect) { return; } - if (reconnectCount.value >= maxReconnectAttempts) { - console.error(`已达到最大重连次数(${maxReconnectAttempts}),停止重连`); + // 检查是否达到最大重连次数 + if (reconnectAttempts.value >= config.maxReconnectAttempts) { + logError(`已达到最大重连次数 (${config.maxReconnectAttempts}),停止重连`); return; } - reconnectCount.value++; - console.log(`准备重连(${reconnectCount.value}/${maxReconnectAttempts})...`); + reconnectAttempts.value++; + connectionState.value = ConnectionState.RECONNECTING; - // 使用指数退避策略增加重连间隔 - const delay = useExponentialBackoff - ? Math.min(reconnectDelay * Math.pow(2, reconnectCount.value - 1), maxReconnectDelay) - : reconnectDelay; + // 计算重连延迟(支持指数退避) + const delay = config.useExponentialBackoff + ? Math.min( + config.reconnectDelay * Math.pow(2, reconnectAttempts.value - 1), + config.maxReconnectDelay + ) + : config.reconnectDelay; - // 清除之前的计时器 + log(`准备重连 (${reconnectAttempts.value}/${config.maxReconnectAttempts}),延迟 ${delay}ms`); + + // 清除之前的重连计时器 if (reconnectTimer) { clearTimeout(reconnectTimer); } // 设置重连计时器 reconnectTimer = setTimeout(() => { - if (!isConnected.value && !isManualDisconnect && !isConnecting) { - console.log(`开始重连...`); + if (connectionState.value !== ConnectionState.CONNECTED && !isManualDisconnect) { + log(`开始第 ${reconnectAttempts.value} 次重连...`); connect(); } }, delay); }; - // 监听 brokerURL 的变化,若地址改变则重新初始化 - watch(brokerURL, (newURL, oldURL) => { + // 监听 brokerURL 的变化,自动重新初始化 + watch(config.brokerURL, (newURL, oldURL) => { if (newURL !== oldURL) { - console.log(`brokerURL changed from ${oldURL} to ${newURL}`); - // 断开当前连接,重新激活客户端 - if (client.value && client.value.connected) { - client.value.deactivate(); + log(`WebSocket 端点已更改: ${oldURL} -> ${newURL}`); + + // 断开当前连接 + if (stompClient.value && stompClient.value.connected) { + stompClient.value.deactivate(); } - brokerURL.value = newURL; - initializeClient(); // 重新初始化客户端 + + // 重新初始化客户端 + initializeClient(); } }); // 初始化客户端 initializeClient(); + // ==================== 公共接口 ==================== + /** - * 激活连接(如果已经连接或正在激活则直接返回) + * 建立 WebSocket 连接 */ const connect = () => { // 重置手动断开标志 isManualDisconnect = false; - // 检查是否有配置WebSocket端点 - if (!brokerURL.value) { - console.error("WebSocket连接失败: 未配置WebSocket端点URL"); + // 检查是否配置了 WebSocket 端点 + if (!config.brokerURL.value) { + logError("WebSocket 连接失败: 未配置 WebSocket 端点 URL"); return; } // 防止重复连接 - if (isConnecting) { - console.log("WebSocket正在连接中,跳过重复连接请求"); + if (connectionState.value === ConnectionState.CONNECTING) { + log("WebSocket 正在连接中,跳过重复连接请求"); return; } - if (!client.value) { + // 如果客户端不存在,先初始化 + if (!stompClient.value) { initializeClient(); } - if (!client.value) { - console.error("STOMP客户端初始化失败"); + if (!stompClient.value) { + logError("STOMP 客户端初始化失败"); return; } - // 避免重复连接:检查是否已连接 - if (client.value.connected) { - console.log("WebSocket已经连接,跳过重复连接"); - isConnected.value = true; + // 避免重复连接:检查是否已连接 + if (stompClient.value.connected) { + log("WebSocket 已连接,跳过重复连接"); + connectionState.value = ConnectionState.CONNECTED; return; } - // 设置连接标志 - isConnecting = true; + // 设置连接状态 + connectionState.value = ConnectionState.CONNECTING; // 设置连接超时 - if (connectionTimeoutTimer) clearTimeout(connectionTimeoutTimer); + if (connectionTimeoutTimer) { + clearTimeout(connectionTimeoutTimer); + } + connectionTimeoutTimer = setTimeout(() => { - if (!isConnected.value && isConnecting) { - console.warn("WebSocket连接超时"); - isConnecting = false; - if (!isManualDisconnect && reconnectCount.value < maxReconnectAttempts) { - handleReconnect(); + if (connectionState.value === ConnectionState.CONNECTING) { + logWarn("WebSocket 连接超时"); + connectionState.value = ConnectionState.DISCONNECTED; + + // 超时后尝试重连 + if (!isManualDisconnect && reconnectAttempts.value < config.maxReconnectAttempts) { + scheduleReconnect(); } } - }, connectionTimeout); + }, config.connectionTimeout); try { - client.value.activate(); - console.log("正在建立WebSocket连接..."); + stompClient.value.activate(); + log("正在建立 WebSocket 连接..."); } catch (error) { - console.error("激活WebSocket连接失败:", error); - isConnecting = false; + logError("激活 WebSocket 连接失败:", error); + connectionState.value = ConnectionState.DISCONNECTED; + } + }; + + /** + * 执行订阅操作(内部方法) + */ + const performSubscribe = (destination: string, callback: (message: IMessage) => void): string => { + if (!stompClient.value || !stompClient.value.connected) { + logWarn(`尝试订阅 ${destination} 失败: 客户端未连接`); + return ""; + } + + try { + const subscription = stompClient.value.subscribe(destination, callback); + const subscriptionId = subscription.id; + activeSubscriptions.set(subscriptionId, subscription); + log(`✓ 订阅成功: ${destination} (ID: ${subscriptionId})`); + return subscriptionId; + } catch (error) { + logError(`订阅 ${destination} 失败:`, error); + return ""; } }; /** * 订阅指定主题 - * @param destination 目标主题地址 + * + * @param destination 目标主题地址(如:/topic/message) * @param callback 接收到消息时的回调函数 - * @returns 返回订阅 id,用于后续取消订阅 + * @returns 订阅 ID,用于后续取消订阅 */ - const subscribe = (destination: string, callback: (_message: IMessage) => void): string => { - if (!client.value || !client.value.connected) { - console.warn(`尝试订阅 ${destination} 失败: 客户端未连接`); - return ""; + const subscribe = (destination: string, callback: (message: IMessage) => void): string => { + // 保存订阅配置到注册表,用于断线重连后自动恢复 + subscriptionRegistry.set(destination, { destination, callback }); + + // 如果已连接,立即订阅 + if (stompClient.value?.connected) { + return performSubscribe(destination, callback); } - try { - const subscription = client.value.subscribe(destination, callback); - const subscriptionId = subscription.id; - subscriptions.set(subscriptionId, subscription); - console.log(`订阅成功: ${destination}, ID: ${subscriptionId}`); - return subscriptionId; - } catch (error) { - console.error(`订阅 ${destination} 失败:`, error); - return ""; - } + log(`暂存订阅配置: ${destination},将在连接建立后自动订阅`); + return ""; }; /** * 取消订阅 - * @param subscriptionId 订阅 id + * + * @param subscriptionId 订阅 ID(由 subscribe 方法返回) */ const unsubscribe = (subscriptionId: string) => { - const subscription = subscriptions.get(subscriptionId); + const subscription = activeSubscriptions.get(subscriptionId); if (subscription) { - subscription.unsubscribe(); - subscriptions.delete(subscriptionId); - console.log(`已取消订阅: ${subscriptionId}`); + try { + subscription.unsubscribe(); + activeSubscriptions.delete(subscriptionId); + log(`✓ 已取消订阅: ${subscriptionId}`); + } catch (error) { + logWarn(`取消订阅 ${subscriptionId} 时出错:`, error); + } } }; /** - * 断开WebSocket连接 + * 取消指定主题的订阅(从注册表中移除) + * + * @param destination 主题地址 */ - const disconnect = () => { + const unsubscribeDestination = (destination: string) => { + // 从注册表中移除 + subscriptionRegistry.delete(destination); + + // 取消所有匹配该主题的活动订阅 + for (const [id, subscription] of activeSubscriptions.entries()) { + // 注意:STOMP 的 subscription 对象没有直接暴露 destination, + // 这里简化处理,实际使用时可能需要额外维护 id -> destination 的映射 + try { + subscription.unsubscribe(); + activeSubscriptions.delete(id); + } catch (error) { + logWarn(`取消订阅 ${id} 时出错:`, error); + } + } + + log(`✓ 已移除主题订阅配置: ${destination}`); + }; + + /** + * 断开 WebSocket 连接 + * + * @param clearSubscriptions 是否清除订阅注册表(默认为 true) + */ + const disconnect = (clearSubscriptions = true) => { // 设置手动断开标志 isManualDisconnect = true; - // 清除所有计时器 - if (reconnectTimer) { - clearTimeout(reconnectTimer); - reconnectTimer = null; - } + // 清除所有定时器 + clearAllTimers(); - if (connectionTimeoutTimer) { - clearTimeout(connectionTimeoutTimer); - connectionTimeoutTimer = null; - } - - // 清除所有订阅 - for (const [id, subscription] of subscriptions.entries()) { + // 取消所有活动订阅 + for (const [id, subscription] of activeSubscriptions.entries()) { try { subscription.unsubscribe(); } catch (error) { - console.warn(`取消订阅 ${id} 时出错:`, error); + logWarn(`取消订阅 ${id} 时出错:`, error); } } - subscriptions.clear(); + activeSubscriptions.clear(); + + // 可选:清除订阅注册表 + if (clearSubscriptions) { + subscriptionRegistry.clear(); + log("已清除所有订阅配置"); + } // 断开连接 - if (client.value) { + if (stompClient.value) { try { - if (client.value.connected || client.value.active) { - client.value.deactivate(); - console.log("WebSocket连接已主动断开"); + if (stompClient.value.connected || stompClient.value.active) { + stompClient.value.deactivate(); + log("✓ WebSocket 连接已主动断开"); } } catch (error) { - console.error("断开WebSocket连接时出错:", error); + logError("断开 WebSocket 连接时出错:", error); } - client.value = null; + stompClient.value = null; } - isConnected.value = false; - isConnecting = false; - reconnectCount.value = 0; + connectionState.value = ConnectionState.DISCONNECTED; + reconnectAttempts.value = 0; }; + // ==================== 返回公共接口 ==================== return { + // 状态 + connectionState: readonly(connectionState), isConnected, + reconnectAttempts: readonly(reconnectAttempts), + + // 连接管理 connect, + disconnect, + + // 订阅管理 subscribe, unsubscribe, - disconnect, + unsubscribeDestination, + + // 统计信息 + getActiveSubscriptionCount: () => activeSubscriptions.size, + getRegisteredSubscriptionCount: () => subscriptionRegistry.size, }; }