refactor: ♻️ stomp 连接管理器优化,添加指数退避重连策略,调整 WebSocket 服务的目录结构

This commit is contained in:
Ray.Hao
2025-04-25 18:39:26 +08:00
parent 152789001a
commit f3576d51f2
11 changed files with 287 additions and 202 deletions

View File

@@ -94,7 +94,7 @@ const noticeList = ref<NoticePageVO[]>([]);
const noticeDialogVisible = ref(false);
const noticeDetail = ref<NoticeDetailVO | null>(null);
import { useStomp } from "@/hooks/useStomp";
import { useStomp } from "@/hooks/websocket/core/useStomp";
const { subscribe, unsubscribe, isConnected } = useStomp();
watch(

7
src/hooks/index.ts Normal file
View File

@@ -0,0 +1,7 @@
/**
* 全局Hooks入口文件
* 导出所有可用的Hooks
*/
// 导出WebSocket相关Hook
export * from "./websocket";

View File

@@ -1,176 +0,0 @@
import { ref, onMounted, onUnmounted, watch } from "vue";
import { useStomp } from "./useStomp";
import { ElMessage } from "element-plus";
export interface OnlineUserStats {
type: string; // 事件类型
count: number; // 当前在线用户数量
users?: any[]; // 用户列表(可选)
timestamp: number; // 时间戳
}
/**
* 在线用户WebSocket Hook
* 用于订阅后端推送的在线用户数量变化
*/
export function useWebSocketOnlineUsers() {
// 在线用户数量
const onlineUserCount = ref(0);
// 最后更新时间戳
const lastUpdateTime = ref(0);
// 连接状态
const isConnected = ref(false);
// 连接正在尝试中
const isConnecting = ref(false);
// 使用Stomp客户端
const { connect, subscribe, unsubscribe, disconnect, isConnected: stompConnected } = useStomp();
// 订阅ID
let subscriptionId = "";
// 重连次数
let reconnectCount = 0;
// 最大重连次数
const maxReconnectAttempts = 5;
// 重连计时器
let reconnectTimer: any = null;
// 监听Stomp连接状态
watch(stompConnected, (connected) => {
if (connected && isConnecting.value) {
isConnected.value = true;
isConnecting.value = false;
reconnectCount = 0;
// 一旦连接成功,立即订阅主题
subscribeToOnlineUsers();
console.log("WebSocket连接成功已订阅在线用户主题");
}
});
/**
* 订阅在线用户主题
*/
const subscribeToOnlineUsers = () => {
if (!stompConnected.value) return;
// 如果已经订阅,先取消订阅
if (subscriptionId) {
unsubscribe(subscriptionId);
}
// 订阅在线用户主题
subscriptionId = subscribe("/topic/online-users", (message) => {
try {
const data: OnlineUserStats = JSON.parse(message.body);
// 只有在消息类型为ONLINE_USERS_CHANGE时更新数据
if (data.type === "ONLINE_USERS_CHANGE") {
onlineUserCount.value = data.count || 0;
lastUpdateTime.value = data.timestamp || Date.now();
}
} catch (error) {
console.error("解析在线用户数据失败:", error);
}
});
};
/**
* 初始化WebSocket连接并订阅在线用户主题
*/
const initWebSocket = () => {
if (isConnecting.value) return;
isConnecting.value = true;
// 连接WebSocket
connect();
// 设置连接超时
const connectionTimeout = setTimeout(() => {
if (!isConnected.value) {
console.warn("WebSocket连接超时尝试重连");
attemptReconnect();
}
}, 5000);
// 监听连接状态变化,连接成功后清除超时计时器
const unwatch = watch(stompConnected, (connected) => {
if (connected) {
clearTimeout(connectionTimeout);
unwatch();
}
});
};
/**
* 尝试重新连接
*/
const attemptReconnect = () => {
if (reconnectCount >= maxReconnectAttempts) {
console.error(`已达到最大重连次数(${maxReconnectAttempts}),停止重连`);
isConnecting.value = false;
ElMessage.error("WebSocket连接失败请稍后刷新页面重试");
return;
}
reconnectCount++;
console.log(`尝试重连(${reconnectCount}/${maxReconnectAttempts})...`);
// 使用指数退避策略增加重连间隔
const delay = Math.min(1000 * Math.pow(2, reconnectCount), 30000);
// 清除之前的计时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
// 设置重连计时器
reconnectTimer = setTimeout(() => {
connect();
}, delay);
};
/**
* 关闭WebSocket连接
*/
const closeWebSocket = () => {
if (subscriptionId) {
unsubscribe(subscriptionId);
subscriptionId = "";
}
// 清除重连计时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
disconnect();
isConnected.value = false;
isConnecting.value = false;
};
// 组件挂载时初始化WebSocket
onMounted(() => {
initWebSocket();
});
// 组件卸载时关闭WebSocket
onUnmounted(() => {
closeWebSocket();
});
return {
onlineUserCount,
lastUpdateTime,
isConnected,
isConnecting,
initWebSocket,
closeWebSocket,
};
}

View File

@@ -7,12 +7,24 @@ export interface UseStompOptions {
brokerURL?: string;
/** 用于鉴权的 token不传时使用 getAccessToken() 的返回值 */
token?: string;
/** 重连延迟,单位毫秒,默认为 5000 */
/** 重连延迟,单位毫秒,默认为 8000 */
reconnectDelay?: number;
/** 连接超时时间,单位毫秒,默认为 10000 */
connectionTimeout?: number;
/** 是否开启指数退避重连策略 */
useExponentialBackoff?: boolean;
/** 最大重连次数,默认为 5 */
maxReconnectAttempts?: number;
/** 最大重连延迟,单位毫秒,默认为 60000 */
maxReconnectDelay?: number;
/** 是否开启调试日志 */
debug?: boolean;
}
/**
* STOMP WebSocket连接Hook
* WebSocket连接的建立
*/
export function useStomp(options: UseStompOptions = {}) {
// 默认值brokerURL 从环境变量中获取token 从 getAccessToken() 获取
const defaultBrokerURL = import.meta.env.VITE_APP_WS_ENDPOINT || "";
@@ -20,9 +32,20 @@ export function useStomp(options: UseStompOptions = {}) {
const brokerURL = ref(options.brokerURL ?? defaultBrokerURL);
const token = options.token ?? defaultToken;
const reconnectDelay = options.reconnectDelay ?? 8000;
const connectionTimeout = options.connectionTimeout ?? 10000;
const useExponentialBackoff = options.useExponentialBackoff ?? false;
const maxReconnectAttempts = options.maxReconnectAttempts ?? 5;
const maxReconnectDelay = options.maxReconnectDelay ?? 60000;
// 连接状态标记
const isConnected = ref(false);
// 重连尝试次数
const reconnectCount = ref(0);
// 重连计时器
let reconnectTimer: any = null;
// 连接超时计时器
let connectionTimeoutTimer: any = null;
// 存储所有订阅
const subscriptions = new Map<string, StompSubscription>();
@@ -43,8 +66,8 @@ export function useStomp(options: UseStompOptions = {}) {
connectHeaders: {
Authorization: `Bearer ${token}`,
},
debug: () => {},
reconnectDelay: 5000,
debug: options.debug ? console.log : () => {},
reconnectDelay: useExponentialBackoff ? 0 : reconnectDelay, // 使用自定义退避策略时禁用内置重连
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
@@ -52,6 +75,8 @@ export function useStomp(options: UseStompOptions = {}) {
// 设置连接监听器
client.value.onConnect = () => {
isConnected.value = true;
reconnectCount.value = 0;
clearTimeout(connectionTimeoutTimer);
console.log("WebSocket连接已建立");
};
@@ -59,6 +84,11 @@ export function useStomp(options: UseStompOptions = {}) {
client.value.onDisconnect = () => {
isConnected.value = false;
console.log("WebSocket连接已断开");
// 如果使用自定义指数退避重连策略,则在这里处理
if (useExponentialBackoff && reconnectCount.value < maxReconnectAttempts) {
handleReconnect();
}
};
// 设置 Web Socket 关闭监听器
@@ -73,6 +103,36 @@ export function useStomp(options: UseStompOptions = {}) {
};
};
/**
*
*/
const handleReconnect = () => {
if (reconnectCount.value >= maxReconnectAttempts) {
console.error(`已达到最大重连次数(${maxReconnectAttempts}),停止重连`);
return;
}
reconnectCount.value++;
console.log(`尝试重连(${reconnectCount.value}/${maxReconnectAttempts})...`);
// 使用指数退避策略增加重连间隔
const delay = useExponentialBackoff
? Math.min(reconnectDelay * Math.pow(2, reconnectCount.value - 1), maxReconnectDelay)
: reconnectDelay;
// 清除之前的计时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
// 设置重连计时器
reconnectTimer = setTimeout(() => {
if (!isConnected.value && client.value) {
client.value.activate();
}
}, delay);
};
// 监听 brokerURL 的变化,若地址改变则重新初始化
watch(brokerURL, (newURL, oldURL) => {
if (newURL !== oldURL) {
@@ -106,6 +166,17 @@ export function useStomp(options: UseStompOptions = {}) {
return;
}
// 设置连接超时
clearTimeout(connectionTimeoutTimer);
connectionTimeoutTimer = setTimeout(() => {
if (!isConnected.value) {
console.warn("WebSocket连接超时");
if (useExponentialBackoff) {
handleReconnect();
}
}
}, connectionTimeout);
client.value.activate();
};
@@ -155,13 +226,27 @@ export function useStomp(options: UseStompOptions = {}) {
console.log("Already disconnected, skipping disconnect() call.");
return;
}
// 清除所有计时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (connectionTimeoutTimer) {
clearTimeout(connectionTimeoutTimer);
connectionTimeoutTimer = null;
}
client.value?.deactivate();
isConnected.value = false;
reconnectCount.value = 0;
};
return {
client,
isConnected,
reconnectCount,
connect,
subscribe,
unsubscribe,

View File

@@ -0,0 +1,11 @@
/**
* WebSocket相关Hook入口文件
* 统一导出所有WebSocket相关Hook
*/
// 核心基础Hook
export { useStomp } from "./core/useStomp";
// 业务服务Hook
export { useOnlineCount } from "./services/useOnlineCount";
export { useDictSync } from "./services/useDictSync";

View File

@@ -1,6 +1,7 @@
import { useDictStoreHook } from "@/store/modules/dict.store";
import { useStomp } from "@/hooks/useStomp";
import { useStomp } from "../core/useStomp";
import { IMessage } from "@stomp/stompjs";
import { ref } from "vue";
// 字典消息类型
export interface DictMessage {
@@ -12,14 +13,21 @@ export interface DictMessage {
export type DictMessageCallback = (message: DictMessage) => void;
// 全局单例实例
let instance: ReturnType<typeof createWebSocketDict> | null = null;
let instance: ReturnType<typeof createDictSyncHook> | null = null;
// 创建WebSocket词典处理函数
function createWebSocketDict() {
/**
* Hook
*
*/
function createDictSyncHook() {
const dictStore = useDictStoreHook();
// 使用现有的useStomp
const { isConnected, connect, subscribe, unsubscribe, disconnect } = useStomp();
// 使用现有的useStomp,配置适合字典场景的重连参数
const { isConnected, connect, subscribe, unsubscribe, disconnect } = useStomp({
reconnectDelay: 10000, // 使用更长的重连延迟 - 10秒
connectionTimeout: 15000, // 更长的连接超时时间 - 15秒
useExponentialBackoff: false, // 字典数据不需要指数退避策略
});
// 存储订阅ID
const subscriptionIds = ref<string[]>([]);
@@ -89,12 +97,12 @@ function createWebSocketDict() {
console.log(`开始尝试订阅字典主题: ${topic}`);
// 延迟订阅,确保连接先建立
// 使用简化的重试逻辑依赖useStomp的连接管理
const attemptSubscribe = () => {
if (!isConnected.value) {
console.log("等待WebSocket连接建立...");
// 500ms后再次尝试
setTimeout(attemptSubscribe, 500);
// 3秒后再次尝试
setTimeout(attemptSubscribe, 3000);
return;
}
@@ -115,9 +123,7 @@ function createWebSocketDict() {
subscribedTopics.value.add(topic);
console.log(`字典主题订阅成功: ${topic}`);
} else {
console.warn(`字典主题订阅失败1秒后重试: ${topic}`);
// 尝试重新订阅
setTimeout(attemptSubscribe, 1000);
console.warn(`字典主题订阅失败: ${topic}`);
}
};
@@ -171,10 +177,13 @@ function createWebSocketDict() {
};
}
// 导出单例实例的钩子
export function useWebSocketDict() {
/**
* Hook
*
*/
export function useDictSync() {
if (!instance) {
instance = createWebSocketDict();
instance = createDictSyncHook();
}
return instance;
}

View File

@@ -0,0 +1,149 @@
import { ref, onMounted, onUnmounted, watch } from "vue";
import { useStomp } from "../core/useStomp";
import { ElMessage } from "element-plus";
/**
* 在线用户计数Hook
* 用于订阅后端推送的在线用户数量变化
*/
export function useOnlineCount() {
// 在线用户数量
const onlineUserCount = ref(0);
// 最后更新时间戳
const lastUpdateTime = ref(0);
// 连接状态
const isConnected = ref(false);
// 连接正在尝试中
const isConnecting = ref(false);
// 使用Stomp客户端 - 配置使用指数退避策略
const {
connect,
subscribe,
unsubscribe,
disconnect,
isConnected: stompConnected,
} = useStomp({
reconnectDelay: 5000, // 初始重连延迟5秒
maxReconnectAttempts: 3, // 最大重连3次
connectionTimeout: 10000, // 连接超时10秒
useExponentialBackoff: true, // 启用指数退避
});
// 订阅ID
let subscriptionId = "";
// 连接超时计时器
let connectionTimeoutTimer: any = null;
// 监听Stomp连接状态
watch(stompConnected, (connected) => {
if (connected && isConnecting.value) {
isConnected.value = true;
isConnecting.value = false;
// 一旦连接成功,立即订阅主题
subscribeToOnlineCount();
console.log("WebSocket连接成功已订阅在线用户计数主题");
}
});
/**
* 订阅在线用户计数主题
*/
const subscribeToOnlineCount = () => {
if (!stompConnected.value) return;
// 如果已经订阅,先取消订阅
if (subscriptionId) {
unsubscribe(subscriptionId);
}
// 订阅在线用户计数主题
subscriptionId = subscribe("/topic/online-count", (message) => {
try {
const data = message.body;
const jsonData = JSON.parse(data);
const count = typeof jsonData === "number" ? jsonData : jsonData.count;
if (!isNaN(count)) {
onlineUserCount.value = count;
lastUpdateTime.value = Date.now();
}
} catch (error) {
console.error("解析在线用户数量失败:", error);
}
});
};
/**
* 初始化WebSocket连接并订阅在线用户主题
*/
const initWebSocket = () => {
if (isConnecting.value) return;
isConnecting.value = true;
// 连接WebSocket
connect();
// 设置连接超时显示UI提示
connectionTimeoutTimer = setTimeout(() => {
if (!isConnected.value) {
console.warn("WebSocket连接超时将自动尝试重连");
ElMessage.warning("正在尝试连接服务器,请稍候...");
}
}, 10000); // 较长的UI提示超时
// 监听连接状态变化,连接成功后清除超时计时器
const unwatch = watch(stompConnected, (connected) => {
if (connected) {
clearTimeout(connectionTimeoutTimer);
unwatch();
}
});
};
/**
* 关闭WebSocket连接
*/
const closeWebSocket = () => {
if (subscriptionId) {
unsubscribe(subscriptionId);
subscriptionId = "";
}
// 清除连接超时计时器
if (connectionTimeoutTimer) {
clearTimeout(connectionTimeoutTimer);
connectionTimeoutTimer = null;
}
disconnect();
isConnected.value = false;
isConnecting.value = false;
};
// 组件挂载时初始化WebSocket
onMounted(() => {
initWebSocket();
});
// 组件卸载时关闭WebSocket
onUnmounted(() => {
closeWebSocket();
});
return {
onlineUserCount,
lastUpdateTime,
isConnected,
isConnecting,
initWebSocket,
closeWebSocket,
};
}

View File

@@ -1,4 +1,4 @@
import { useWebSocketDict } from "@/hooks/useWebSocketDict";
import { useDictSync } from "@/hooks/websocket/services/useDictSync";
import { getAccessToken } from "@/utils/auth";
/**
@@ -19,7 +19,7 @@ export function setupWebSocket() {
try {
// 延迟初始化,确保应用完全启动
setTimeout(() => {
const dictWebSocket = useWebSocketDict();
const dictWebSocket = useDictSync();
// 初始化字典WebSocket服务
dictWebSocket.initWebSocket();

View File

@@ -324,10 +324,10 @@ import { useUserStore } from "@/store/modules/user.store";
import { formatGrowthRate } from "@/utils";
import { useTransition, useDateFormat } from "@vueuse/core";
import { Connection, Failed } from "@element-plus/icons-vue";
import { useWebSocketOnlineUsers } from "@/hooks/useWebSocketOnlineUsers";
import { useOnlineCount } from "@/hooks/websocket/services/useOnlineCount";
// 在线用户数量组件相关
const { onlineUserCount, lastUpdateTime, isConnected } = useWebSocketOnlineUsers();
const { onlineUserCount, lastUpdateTime, isConnected } = useOnlineCount();
// 记录上一次的用户数量用于计算趋势
const previousCount = ref(0);

View File

@@ -142,7 +142,7 @@
import { useDictStoreHook } from "@/store/modules/dict.store";
import { useDateFormat } from "@vueuse/core";
import DictAPI, { DictItemForm } from "@/api/system/dict.api";
import { useWebSocketDict, DictMessage } from "@/hooks/useWebSocketDict";
import { useDictSync, DictMessage } from "@/hooks/websocket/services/useDictSync";
// 性别字典编码
const DICT_CODE = "gender";
@@ -161,7 +161,7 @@ const dictForm = ref<DictItemForm | null>(null);
const selectedGender = ref("");
// 初始化WebSocket
const dictWebSocket = useWebSocketDict();
const dictWebSocket = useDictSync();
// 获取连接状态
const wsConnected = computed(() => dictWebSocket.isConnected);

View File

@@ -97,7 +97,7 @@
</template>
<script setup lang="ts">
import { useStomp } from "@/hooks/useStomp";
import { useStomp } from "@/hooks/websocket/core/useStomp";
import { useUserStoreHook } from "@/store/modules/user.store";
const userStore = useUserStoreHook();