feat: 字典实时同步和首页添加在线用户统计

This commit is contained in:
Ray.Hao
2025-04-22 22:15:15 +08:00
parent cad57b3dc0
commit fdf66164d8
14 changed files with 1275 additions and 70 deletions

View File

@@ -1,5 +1,6 @@
import { Client, IMessage, StompSubscription } from "@stomp/stompjs";
import { getAccessToken } from "@/utils/auth";
import { ref, watch } from "vue";
export interface UseStompOptions {
/** WebSocket 地址,不传时使用 VITE_APP_WS_ENDPOINT 环境变量 */
@@ -30,43 +31,46 @@ export function useStomp(options: UseStompOptions = {}) {
/**
* 初始化 STOMP 客户端
* 只有在 brokerURL 非空时才会初始化客户端
*/
const initializeClient = () => {
if (!brokerURL.value) {
console.warn(
"brokerURL is required. Please set the WebSocket URL in your .env file (VITE_APP_WS_ENDPOINT)."
);
if (client.value) {
return;
}
if (!client.value) {
client.value = new Client({
brokerURL: brokerURL.value,
reconnectDelay: options.reconnectDelay ?? 5000,
debug: options.debug ? (msg) => console.log("[STOMP]", msg) : () => {},
connectHeaders: {
Authorization: `Bearer ${token}`,
},
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
// 创建 STOMP 客户端
client.value = new Client({
brokerURL: brokerURL.value,
connectHeaders: {
Authorization: `Bearer ${token}`,
},
debug: () => {},
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
client.value.onConnect = (frame) => {
isConnected.value = true;
console.log("STOMP connected", frame);
};
// 设置连接监听器
client.value.onConnect = () => {
isConnected.value = true;
console.log("WebSocket连接已建立");
};
client.value.onStompError = (frame) => {
console.error("Broker reported error: " + frame.headers["message"]);
console.error("Additional details: " + frame.body);
};
// 设置断开连接监听器
client.value.onDisconnect = () => {
isConnected.value = false;
console.log("WebSocket连接已断开");
};
client.value.onWebSocketClose = (evt) => {
isConnected.value = false;
console.warn("WebSocket closed", evt);
};
}
// 设置 Web Socket 关闭监听器
client.value.onWebSocketClose = (event) => {
isConnected.value = false;
console.log(`WebSocket已关闭: ${event?.code} ${event?.reason}`);
};
// 设置错误监听器
client.value.onStompError = (frame) => {
console.error("STOMP错误:", frame.headers, frame.body);
};
};
// 监听 brokerURL 的变化,若地址改变则重新初始化
@@ -82,21 +86,27 @@ export function useStomp(options: UseStompOptions = {}) {
}
});
// 在组件挂载时检查并初始化客户端
onMounted(() => {
console.log("useStomp onMounted initializeClient");
initializeClient();
});
// 初始化客户端
initializeClient();
/**
* 激活连接(如果已经连接或正在激活则直接返回)
*/
const connect = () => {
if (!client.value) {
initializeClient();
}
if (client.value && (client.value.connected || client.value.active)) {
console.log("Already connected or connecting, skipping connect() call.");
return;
}
client.value?.activate();
if (!client.value) {
console.error("STOMP客户端初始化失败");
return;
}
client.value.activate();
};
/**
@@ -106,12 +116,23 @@ export function useStomp(options: UseStompOptions = {}) {
* @returns 返回订阅 id用于后续取消订阅
*/
const subscribe = (destination: string, callback: (message: IMessage) => void): string => {
if (client.value) {
if (!client.value) {
return "";
}
if (!client.value.connected) {
return "";
}
try {
const subscription = client.value.subscribe(destination, callback);
subscriptions.set(subscription.id, subscription);
console.log(`订阅成功: ${destination}, ID: ${subscription.id}`);
return subscription.id;
} catch (error) {
console.error(`订阅失败(${destination}):`, error);
return "";
}
return "";
};
/**

View File

@@ -0,0 +1,180 @@
import { useDictStoreHook } from "@/store/modules/dict.store";
import { useStomp } from "@/hooks/useStomp";
import { IMessage } from "@stomp/stompjs";
// 字典消息类型
export interface DictMessage {
dictCode: string;
timestamp: number;
}
// 字典事件回调类型
export type DictMessageCallback = (message: DictMessage) => void;
// 全局单例实例
let instance: ReturnType<typeof createWebSocketDict> | null = null;
// 创建WebSocket词典处理函数
function createWebSocketDict() {
const dictStore = useDictStoreHook();
// 使用现有的useStomp
const { isConnected, connect, subscribe, unsubscribe, disconnect } = useStomp();
// 存储订阅ID
const subscriptionIds = ref<string[]>([]);
// 已订阅的主题
const subscribedTopics = ref<Set<string>>(new Set());
// 消息回调函数列表
const messageCallbacks = ref<DictMessageCallback[]>([]);
/**
* 注册字典消息回调
* @param callback 回调函数
*/
const onDictMessage = (callback: DictMessageCallback) => {
messageCallbacks.value.push(callback);
return () => {
// 返回取消注册的函数
const index = messageCallbacks.value.indexOf(callback);
if (index !== -1) {
messageCallbacks.value.splice(index, 1);
}
};
};
/**
* 初始化WebSocket
*/
const initWebSocket = async () => {
try {
// 连接WebSocket
connect();
// 设置字典订阅
setupDictSubscription();
} catch (error) {
console.error("[WebSocket] 初始化失败:", error);
}
};
/**
* 关闭WebSocket
*/
const closeWebSocket = () => {
// 取消所有订阅
subscriptionIds.value.forEach((id) => {
unsubscribe(id);
});
subscriptionIds.value = [];
subscribedTopics.value.clear();
// 断开连接
disconnect();
};
/**
* 设置字典订阅
*/
const setupDictSubscription = () => {
const topic = "/topic/dict";
// 防止重复订阅
if (subscribedTopics.value.has(topic)) {
console.log(`跳过重复订阅: ${topic}`);
return;
}
console.log(`开始尝试订阅字典主题: ${topic}`);
// 延迟订阅,确保连接先建立
const attemptSubscribe = () => {
if (!isConnected.value) {
console.log("等待WebSocket连接建立...");
// 500ms后再次尝试
setTimeout(attemptSubscribe, 500);
return;
}
// 检查是否已订阅
if (subscribedTopics.value.has(topic)) {
return;
}
console.log(`连接已建立,开始订阅: ${topic}`);
// 订阅字典更新
const subId = subscribe(topic, (message: IMessage) => {
handleDictEvent(message);
});
if (subId) {
subscriptionIds.value.push(subId);
subscribedTopics.value.add(topic);
console.log(`字典主题订阅成功: ${topic}`);
} else {
console.warn(`字典主题订阅失败1秒后重试: ${topic}`);
// 尝试重新订阅
setTimeout(attemptSubscribe, 1000);
}
};
// 开始尝试订阅
attemptSubscribe();
};
/**
* 处理字典事件
* @param message STOMP消息
*/
const handleDictEvent = (message: IMessage) => {
if (!message.body) return;
try {
// 记录接收到的消息
console.log(`收到字典更新消息: ${message.body}`);
// 尝试解析消息
const parsedData = JSON.parse(message.body) as DictMessage;
const dictCode = parsedData.dictCode;
if (!dictCode) return;
// 清除缓存,等待按需加载
dictStore.removeDictItem(dictCode);
console.log(`字典缓存已清除: ${dictCode}`);
// 调用所有注册的回调函数
messageCallbacks.value.forEach((callback) => {
try {
callback(parsedData);
} catch (callbackError) {
console.error("[WebSocket] 回调执行失败:", callbackError);
}
});
// 显示提示消息
console.info(`字典 ${dictCode} 已变更,将在下次使用时自动加载`);
} catch (error) {
console.error("[WebSocket] 解析消息失败:", error);
}
};
return {
isConnected,
initWebSocket,
closeWebSocket,
handleDictEvent,
onDictMessage,
};
}
// 导出单例实例的钩子
export function useWebSocketDict() {
if (!instance) {
instance = createWebSocketDict();
}
return instance;
}

View File

@@ -0,0 +1,176 @@
import { ref, onMounted, onUnmounted, watch } from "vue";
import { useStomp } from "./useStomp";
import { ElMessage } from "element-plus";
export interface OnlineUserStats {
type: string; // 事件类型
count: number; // 当前在线用户数量
users?: any[]; // 用户列表(可选)
timestamp: number; // 时间戳
}
/**
* 在线用户WebSocket Hook
* 用于订阅后端推送的在线用户数量变化
*/
export function useWebSocketOnlineUsers() {
// 在线用户数量
const onlineUserCount = ref(0);
// 最后更新时间戳
const lastUpdateTime = ref(0);
// 连接状态
const isConnected = ref(false);
// 连接正在尝试中
const isConnecting = ref(false);
// 使用Stomp客户端
const { connect, subscribe, unsubscribe, disconnect, isConnected: stompConnected } = useStomp();
// 订阅ID
let subscriptionId = "";
// 重连次数
let reconnectCount = 0;
// 最大重连次数
const maxReconnectAttempts = 5;
// 重连计时器
let reconnectTimer: any = null;
// 监听Stomp连接状态
watch(stompConnected, (connected) => {
if (connected && isConnecting.value) {
isConnected.value = true;
isConnecting.value = false;
reconnectCount = 0;
// 一旦连接成功,立即订阅主题
subscribeToOnlineUsers();
console.log("WebSocket连接成功已订阅在线用户主题");
}
});
/**
* 订阅在线用户主题
*/
const subscribeToOnlineUsers = () => {
if (!stompConnected.value) return;
// 如果已经订阅,先取消订阅
if (subscriptionId) {
unsubscribe(subscriptionId);
}
// 订阅在线用户主题
subscriptionId = subscribe("/topic/online-users", (message) => {
try {
const data: OnlineUserStats = JSON.parse(message.body);
// 只有在消息类型为ONLINE_USERS_CHANGE时更新数据
if (data.type === "ONLINE_USERS_CHANGE") {
onlineUserCount.value = data.count || 0;
lastUpdateTime.value = data.timestamp || Date.now();
}
} catch (error) {
console.error("解析在线用户数据失败:", error);
}
});
};
/**
* 初始化WebSocket连接并订阅在线用户主题
*/
const initWebSocket = () => {
if (isConnecting.value) return;
isConnecting.value = true;
// 连接WebSocket
connect();
// 设置连接超时
const connectionTimeout = setTimeout(() => {
if (!isConnected.value) {
console.warn("WebSocket连接超时尝试重连");
attemptReconnect();
}
}, 5000);
// 监听连接状态变化,连接成功后清除超时计时器
const unwatch = watch(stompConnected, (connected) => {
if (connected) {
clearTimeout(connectionTimeout);
unwatch();
}
});
};
/**
* 尝试重新连接
*/
const attemptReconnect = () => {
if (reconnectCount >= maxReconnectAttempts) {
console.error(`已达到最大重连次数(${maxReconnectAttempts}),停止重连`);
isConnecting.value = false;
ElMessage.error("WebSocket连接失败请稍后刷新页面重试");
return;
}
reconnectCount++;
console.log(`尝试重连(${reconnectCount}/${maxReconnectAttempts})...`);
// 使用指数退避策略增加重连间隔
const delay = Math.min(1000 * Math.pow(2, reconnectCount), 30000);
// 清除之前的计时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
// 设置重连计时器
reconnectTimer = setTimeout(() => {
connect();
}, delay);
};
/**
* 关闭WebSocket连接
*/
const closeWebSocket = () => {
if (subscriptionId) {
unsubscribe(subscriptionId);
subscriptionId = "";
}
// 清除重连计时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
disconnect();
isConnected.value = false;
isConnecting.value = false;
};
// 组件挂载时初始化WebSocket
onMounted(() => {
initWebSocket();
});
// 组件卸载时关闭WebSocket
onUnmounted(() => {
closeWebSocket();
});
return {
onlineUserCount,
lastUpdateTime,
isConnected,
isConnecting,
initWebSocket,
closeWebSocket,
};
}