From ffb89e50da9e4ebe94520d97f1e1c81f803a461a Mon Sep 17 00:00:00 2001 From: "Ray.Hao" <1490493387@qq.com> Date: Sat, 8 Nov 2025 00:01:43 +0800 Subject: [PATCH] =?UTF-8?q?feat(websocket):=E9=87=8D=E6=9E=84=20WebSocket?= =?UTF-8?q?=20=E9=85=8D=E7=BD=AE=E4=B8=8E=E6=9C=8D=E5=8A=A1=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../youlai/boot/config/WebSocketConfig.java | 356 ++++++++++------ .../service/impl/WebSocketServiceImpl.java | 394 ++++++++++++------ .../impl/WebSocketSessionCleanupService.java | 94 +++++ 3 files changed, 600 insertions(+), 244 deletions(-) create mode 100644 src/main/java/com/youlai/boot/system/service/impl/WebSocketSessionCleanupService.java diff --git a/src/main/java/com/youlai/boot/config/WebSocketConfig.java b/src/main/java/com/youlai/boot/config/WebSocketConfig.java index 72e2405f..2168d171 100644 --- a/src/main/java/com/youlai/boot/config/WebSocketConfig.java +++ b/src/main/java/com/youlai/boot/config/WebSocketConfig.java @@ -27,7 +27,13 @@ import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** - * WebSocket配置 + * WebSocket 配置类 + * + * 核心功能: + * - 配置 WebSocket 端点 + * - 配置消息代理 + * - 实现连接认证与授权 + * - 管理用户会话生命周期 * * @author Ray.Hao * @since 3.0.0 @@ -37,133 +43,251 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo @Slf4j public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { - private final TokenManager tokenManager; - private final WebSocketService webSocketService; + private static final String WS_ENDPOINT = "/ws"; + private static final String APP_DESTINATION_PREFIX = "/app"; + private static final String USER_DESTINATION_PREFIX = "/user"; + private static final String[] BROKER_DESTINATIONS = {"/topic", "/queue"}; - public WebSocketConfig(TokenManager tokenManager, @Lazy WebSocketService webSocketService) { - this.tokenManager = tokenManager; - this.webSocketService = webSocketService; - } + private final TokenManager tokenManager; + private final WebSocketService webSocketService; - /** - * 注册一个端点,客户端通过这个端点进行连接 - */ - @Override - public void registerStompEndpoints(StompEndpointRegistry registry) { - registry - // 注册 /ws 的端点 - .addEndpoint("/ws") - // 允许跨域 - .setAllowedOriginPatterns("*"); - } + public WebSocketConfig(TokenManager tokenManager, @Lazy WebSocketService webSocketService) { + this.tokenManager = tokenManager; + this.webSocketService = webSocketService; + log.info("✓ WebSocket 配置已加载"); + } + /** + * 注册 STOMP 端点 + * + * 客户端通过该端点建立 WebSocket 连接 + */ + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry + .addEndpoint(WS_ENDPOINT) + .setAllowedOriginPatterns("*"); // 允许跨域(生产环境建议配置具体域名) - /** - * 配置消息代理 - */ - @Override - public void configureMessageBroker(MessageBrokerRegistry registry) { - // 客户端发送消息的请求前缀 - registry.setApplicationDestinationPrefixes("/app"); + log.info("✓ STOMP 端点已注册: {}", WS_ENDPOINT); + } - // 客户端订阅消息的请求前缀,topic一般用于广播推送,queue用于点对点推送 - registry.enableSimpleBroker("/topic", "/queue"); + /** + * 配置消息代理 + * + * - /app 前缀:客户端发送消息到服务端的前缀 + * - /topic 前缀:用于广播消息 + * - /queue 前缀:用于点对点消息 + * - /user 前缀:服务端发送给特定用户的消息前缀 + */ + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + // 客户端发送消息的请求前缀 + registry.setApplicationDestinationPrefixes(APP_DESTINATION_PREFIX); - // 服务端通知客户端的前缀,可以不设置,默认为user - registry.setUserDestinationPrefix("/user"); - } + // 启用简单消息代理,处理 /topic 和 /queue 前缀的消息 + registry.enableSimpleBroker(BROKER_DESTINATIONS); + // 服务端通知客户端的前缀 + registry.setUserDestinationPrefix(USER_DESTINATION_PREFIX); - /** - * 配置客户端入站通道拦截器 - *

- * 核心功能: - * 1. 连接建立时解析令牌并绑定用户身份 - * 2. 连接关闭时触发下线通知 - * 3. 异常Token的防御性处理 - */ - @Override - public void configureClientInboundChannel(ChannelRegistration registration) { - registration.interceptors(new ChannelInterceptor() { - @Override - public Message preSend(@NotNull Message message, @NotNull MessageChannel channel) { - StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); - if (accessor == null) { - return ChannelInterceptor.super.preSend(message, channel); + log.info("✓ 消息代理已配置: app={}, broker={}, user={}", + APP_DESTINATION_PREFIX, BROKER_DESTINATIONS, USER_DESTINATION_PREFIX); + } + + /** + * 配置客户端入站通道拦截器 + * + * 核心功能: + * 1. 连接建立时:解析 JWT Token 并绑定用户身份 + * 2. 连接关闭时:触发用户下线通知 + * 3. 安全防护:拦截无效连接请求 + */ + @Override + public void configureClientInboundChannel(ChannelRegistration registration) { + registration.interceptors(new ChannelInterceptor() { + @Override + public Message preSend(@NotNull Message message, @NotNull MessageChannel channel) { + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + + // 防御性检查:确保 accessor 不为空 + if (accessor == null) { + log.warn("⚠ 收到异常消息:无法获取 StompHeaderAccessor"); + return ChannelInterceptor.super.preSend(message, channel); + } + + StompCommand command = accessor.getCommand(); + if (command == null) { + return ChannelInterceptor.super.preSend(message, channel); + } + + try { + switch (command) { + case CONNECT: + handleConnect(accessor); + break; + + case DISCONNECT: + handleDisconnect(accessor); + break; + + case SUBSCRIBE: + handleSubscribe(accessor); + break; + + default: + // 其他命令不需要特殊处理 + break; + } + } catch (AuthenticationException ex) { + // 认证失败时强制关闭连接 + log.error("❌ 连接认证失败: {}", ex.getMessage()); + throw ex; + } catch (Exception ex) { + // 捕获其他未知异常 + log.error("❌ WebSocket 消息处理异常", ex); + throw new MessagingException("消息处理失败: " + ex.getMessage()); + } + + return ChannelInterceptor.super.preSend(message, channel); + } + }); + + log.info("✓ 客户端入站通道拦截器已配置"); + } + + /** + * 处理客户端连接请求 + * + * 安全校验流程: + * 1. 提取 Authorization 头 + * 2. 验证 Bearer Token 格式 + * 3. 解析并验证 JWT 有效性 + * 4. 绑定用户身份到当前会话 + * 5. 记录用户上线状态 + */ + private void handleConnect(StompHeaderAccessor accessor) { + String authorization = accessor.getFirstNativeHeader(HttpHeaders.AUTHORIZATION); + + // 安全检查:确保 Authorization 头存在且格式正确 + if (StrUtil.isBlank(authorization)) { + log.warn("⚠ 非法连接请求:缺少 Authorization 头"); + throw new AuthenticationCredentialsNotFoundException("缺少 Authorization 头"); } + if (!authorization.startsWith("Bearer ")) { + log.warn("⚠ 非法连接请求:Authorization 头格式错误"); + throw new BadCredentialsException("Authorization 头格式错误"); + } + + // 提取 JWT Token(移除 "Bearer " 前缀) + String token = authorization.substring(7); + + if (StrUtil.isBlank(token)) { + log.warn("⚠ 非法连接请求:Token 为空"); + throw new BadCredentialsException("Token 为空"); + } + + // 解析并验证 Token + Authentication authentication; try { - // 处理客户端连接请求 - if (StompCommand.CONNECT.equals(accessor.getCommand())) { - /* - * 安全校验流程: - * 1. 从HEADER中获取Authorization值 - * 2. 校验Bearer Token格式合法性 - * 3. 解析并验证JWT有效性 - * 4. 绑定用户身份到当前会话 - */ - String authorization = accessor.getFirstNativeHeader(HttpHeaders.AUTHORIZATION); - - // 防御性校验:确保Authorization头存在且格式正确 - if (StrUtil.isBlank(authorization) || !authorization.startsWith("Bearer ")) { - log.warn("非法连接请求:缺少有效的Authorization头"); - throw new AuthenticationCredentialsNotFoundException("Missing authorization header"); - } - - // 提取并处理JWT令牌(移除Bearer前缀) - String token = authorization.substring(7); - Authentication authentication = tokenManager.parseToken(token); - - // 令牌解析失败处理 - if (authentication == null) { - log.error("令牌解析失败:{}", token); - throw new BadCredentialsException("Invalid token"); - } - - // 获取用户详细信息 - SysUserDetails userDetails = (SysUserDetails) authentication.getPrincipal(); - if (userDetails == null || StrUtil.isBlank(userDetails.getUsername())) { - log.error("无效的用户凭证:{}", token); - throw new BadCredentialsException("Invalid user credentials"); - } - - String username = userDetails.getUsername(); - log.info("WebSocket连接建立:用户[{}]", username); - - // 绑定用户身份到当前会话(重要:用于@SendToUser等注解) - accessor.setUser(authentication); - - // 记录用户上线状态 - webSocketService.userConnected(username, accessor.getSessionId()); - - } - // 处理客户端断开请求 - else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) { - /* - * 注意:只有成功建立过认证的连接才会触发下线事件 - * 防止未认证成功的连接产生脏数据 - */ - Authentication authentication = (Authentication) accessor.getUser(); - if (authentication != null && authentication.isAuthenticated()) { - String username = ((SysUserDetails) authentication.getPrincipal()).getUsername(); - log.info("WebSocket连接关闭:用户[{}]", username); - - // 记录用户下线状态 - webSocketService.userDisconnected(username); - } - } - } catch (AuthenticationException ex) { - // 认证失败时强制关闭连接 - log.error("连接认证失败:{}", ex.getMessage()); - throw ex; + authentication = tokenManager.parseToken(token); } catch (Exception ex) { - // 捕获其他未知异常 - log.error("WebSocket连接处理异常:", ex); - throw new MessagingException("Connection processing failed"); + log.error("❌ Token 解析失败", ex); + throw new BadCredentialsException("Token 无效: " + ex.getMessage()); } - return ChannelInterceptor.super.preSend(message, channel); - } - }); - } + // 验证解析结果 + if (authentication == null || !authentication.isAuthenticated()) { + log.warn("⚠ Token 解析失败:认证对象无效"); + throw new BadCredentialsException("Token 解析失败"); + } + + // 获取用户详细信息 + Object principal = authentication.getPrincipal(); + if (!(principal instanceof SysUserDetails)) { + log.error("❌ 无效的用户凭证类型: {}", principal.getClass().getName()); + throw new BadCredentialsException("用户凭证类型错误"); + } + + SysUserDetails userDetails = (SysUserDetails) principal; + String username = userDetails.getUsername(); + + if (StrUtil.isBlank(username)) { + log.warn("⚠ 用户名为空"); + throw new BadCredentialsException("用户名为空"); + } + + // 绑定用户身份到当前会话(重要:用于 @SendToUser 等注解) + accessor.setUser(authentication); + + // 获取会话 ID + String sessionId = accessor.getSessionId(); + if (sessionId == null) { + log.warn("⚠ 会话 ID 为空,使用临时 ID"); + sessionId = "temp-" + System.nanoTime(); + } + + // 记录用户上线状态 + try { + webSocketService.userConnected(username, sessionId); + log.info("✓ WebSocket 连接建立成功: 用户[{}], 会话[{}]", username, sessionId); + } catch (Exception ex) { + log.error("❌ 记录用户上线状态失败: 用户[{}], 会话[{}]", username, sessionId, ex); + // 不抛出异常,允许连接继续 + } + } + + /** + * 处理客户端断开连接事件 + * + * 注意: + * - 只有成功建立过认证的连接才会触发下线事件 + * - 防止未认证成功的连接产生脏数据 + */ + private void handleDisconnect(StompHeaderAccessor accessor) { + Authentication authentication = (Authentication) accessor.getUser(); + + // 防御性检查:只处理已认证的连接 + if (authentication == null || !authentication.isAuthenticated()) { + log.debug("未认证的连接断开,跳过处理"); + return; + } + + Object principal = authentication.getPrincipal(); + if (!(principal instanceof SysUserDetails)) { + log.warn("⚠ 断开连接时用户凭证类型异常"); + return; + } + + SysUserDetails userDetails = (SysUserDetails) principal; + String username = userDetails.getUsername(); + + if (StrUtil.isNotBlank(username)) { + try { + webSocketService.userDisconnected(username); + log.info("✓ WebSocket 连接断开: 用户[{}]", username); + } catch (Exception ex) { + log.error("❌ 记录用户下线状态失败: 用户[{}]", username, ex); + } + } + } + + /** + * 处理客户端订阅事件(可选) + * + * 用于记录订阅信息或实施订阅级别的权限控制 + */ + private void handleSubscribe(StompHeaderAccessor accessor) { + Authentication authentication = (Authentication) accessor.getUser(); + + if (authentication != null && authentication.isAuthenticated()) { + String destination = accessor.getDestination(); + String username = authentication.getName(); + + log.debug("用户[{}]订阅主题: {}", username, destination); + + // TODO: 这里可以实现订阅级别的权限控制 + // 例如:检查用户是否有权限订阅某个主题 + } + } } diff --git a/src/main/java/com/youlai/boot/system/service/impl/WebSocketServiceImpl.java b/src/main/java/com/youlai/boot/system/service/impl/WebSocketServiceImpl.java index 12d443c5..d2fedfed 100644 --- a/src/main/java/com/youlai/boot/system/service/impl/WebSocketServiceImpl.java +++ b/src/main/java/com/youlai/boot/system/service/impl/WebSocketServiceImpl.java @@ -4,7 +4,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.youlai.boot.system.model.dto.DictEventDTO; import com.youlai.boot.system.service.WebSocketService; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; @@ -12,12 +14,17 @@ import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** - * WebSocket服务实现类 - * 统一处理WebSocket消息发送和用户在线状态管理 + * WebSocket 服务实现类 + * + * 核心功能: + * - 用户在线状态管理(支持多设备登录) + * - 消息推送(广播、点对点) + * - 字典变更通知 * * @author Ray.Hao * @since 3.0.0 @@ -26,8 +33,23 @@ import java.util.stream.Collectors; @Slf4j public class WebSocketServiceImpl implements WebSocketService { - // 在线用户映射表,key为用户名,value为用户在线信息 - private final Map onlineUsers = new ConcurrentHashMap<>(); + // ==================== 在线用户管理 ==================== + + /** + * 用户在线会话映射表 + * Key: 用户名 + * Value: 该用户的所有会话 ID 集合(支持多设备登录) + */ + private final Map> userSessionsMap = new ConcurrentHashMap<>(); + + /** + * 会话详情映射表 + * Key: 会话 ID + * Value: 会话详细信息 + */ + private final Map sessionDetailsMap = new ConcurrentHashMap<>(); + + // ==================== 依赖注入 ==================== private SimpMessagingTemplate messagingTemplate; private final ObjectMapper objectMapper; @@ -36,67 +58,156 @@ public class WebSocketServiceImpl implements WebSocketService { public WebSocketServiceImpl(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } - + + /** + * 延迟注入 SimpMessagingTemplate,避免循环依赖 + */ @Autowired(required = false) public void setMessagingTemplate(SimpMessagingTemplate messagingTemplate) { this.messagingTemplate = messagingTemplate; - log.info("WebSocket消息模板已初始化"); + log.info("✓ WebSocket 消息模板已初始化"); } - //================================== - // 用户在线状态管理功能 - //================================== + // ==================== 用户在线状态管理 ==================== /** - * 用户上线 + * 处理用户连接事件 * * @param username 用户名 - * @param sessionId WebSocket会话ID(可选) + * @param sessionId WebSocket 会话 ID */ @Override public void userConnected(String username, String sessionId) { - // 生成会话ID(如果未提供) - String actualSessionId = sessionId != null ? sessionId : "session-" + System.nanoTime(); - UserOnlineInfo info = new UserOnlineInfo(username, actualSessionId, System.currentTimeMillis()); - onlineUsers.put(username, info); - log.info("用户[{}]上线,当前在线用户数:{}", username, onlineUsers.size()); - - // 通知在线用户状态变更 - notifyOnlineUsersChangeInternal(); + if (username == null || username.isEmpty()) { + log.warn("用户连接失败:用户名为空"); + return; + } + + if (sessionId == null || sessionId.isEmpty()) { + log.warn("用户[{}]连接失败:会话 ID 为空", username); + return; + } + + // 添加会话到用户的会话集合中(支持多设备登录) + userSessionsMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet()) + .add(sessionId); + + // 保存会话详情 + SessionInfo sessionInfo = new SessionInfo(username, sessionId, System.currentTimeMillis()); + sessionDetailsMap.put(sessionId, sessionInfo); + + int sessionCount = userSessionsMap.get(username).size(); + int totalOnlineUsers = userSessionsMap.size(); + + log.info("✓ 用户[{}]会话[{}]上线(该用户共 {} 个会话,系统总在线用户数:{})", + username, sessionId, sessionCount, totalOnlineUsers); + + // 广播在线用户数变更 + broadcastOnlineUserCount(); } /** - * 用户下线 + * 处理用户断开连接事件 * * @param username 用户名 */ @Override public void userDisconnected(String username) { - onlineUsers.remove(username); - log.info("用户[{}]下线,当前在线用户数:{}", username, onlineUsers.size()); - - // 通知在线用户状态变更 - notifyOnlineUsersChangeInternal(); + if (username == null || username.isEmpty()) { + return; + } + + // 获取该用户的所有会话 + Set sessions = userSessionsMap.get(username); + if (sessions == null || sessions.isEmpty()) { + log.warn("用户[{}]下线:未找到会话记录", username); + return; + } + + // 移除所有会话详情(通常一次只断开一个会话,但这里做全量清理) + sessions.forEach(sessionDetailsMap::remove); + + // 移除用户的会话记录 + userSessionsMap.remove(username); + + int totalOnlineUsers = userSessionsMap.size(); + log.info("✓ 用户[{}]下线(系统总在线用户数:{})", username, totalOnlineUsers); + + // 广播在线用户数变更 + broadcastOnlineUserCount(); + } + + /** + * 移除指定会话(单个设备下线) + * + * @param sessionId 会话 ID + */ + public void removeSession(String sessionId) { + SessionInfo sessionInfo = sessionDetailsMap.remove(sessionId); + if (sessionInfo == null) { + return; + } + + String username = sessionInfo.getUsername(); + Set sessions = userSessionsMap.get(username); + + if (sessions != null) { + sessions.remove(sessionId); + + // 如果该用户没有其他会话了,移除用户记录 + if (sessions.isEmpty()) { + userSessionsMap.remove(username); + log.info("✓ 用户[{}]最后一个会话[{}]下线", username, sessionId); + } else { + log.info("✓ 用户[{}]会话[{}]下线(还剩 {} 个会话)", + username, sessionId, sessions.size()); + } + + // 广播在线用户数变更 + broadcastOnlineUserCount(); + } } /** * 获取在线用户列表 * - * @return 在线用户名列表 + * @return 在线用户信息列表 */ - public List getOnlineUsers() { - return onlineUsers.values().stream() - .map(info -> new UserOnlineDTO(info.getUsername(), info.getLoginTime())) + public List getOnlineUsers() { + return userSessionsMap.entrySet().stream() + .map(entry -> { + String username = entry.getKey(); + Set sessions = entry.getValue(); + + // 获取该用户最早的登录时间 + long earliestLoginTime = sessions.stream() + .map(sessionDetailsMap::get) + .filter(info -> info != null) + .mapToLong(SessionInfo::getConnectTime) + .min() + .orElse(System.currentTimeMillis()); + + return new OnlineUserDTO(username, sessions.size(), earliestLoginTime); + }) .collect(Collectors.toList()); } /** * 获取在线用户数量 * - * @return 在线用户数 + * @return 在线用户数(不是会话数) */ public int getOnlineUserCount() { - return onlineUsers.size(); + return userSessionsMap.size(); + } + + /** + * 获取在线会话总数 + * + * @return 所有在线会话的总数 + */ + public int getTotalSessionCount() { + return sessionDetailsMap.size(); } /** @@ -106,172 +217,199 @@ public class WebSocketServiceImpl implements WebSocketService { * @return 是否在线 */ public boolean isUserOnline(String username) { - return onlineUsers.containsKey(username); + Set sessions = userSessionsMap.get(username); + return sessions != null && !sessions.isEmpty(); } - + /** - * 手动触发在线用户变更通知 - * 供外部手动触发通知使用 + * 获取指定用户的会话数量 + * + * @param username 用户名 + * @return 会话数量 + */ + public int getUserSessionCount(String username) { + Set sessions = userSessionsMap.get(username); + return sessions != null ? sessions.size() : 0; + } + + /** + * 手动触发在线用户数量广播 + * + * 供外部服务(如定时任务)调用 */ public void notifyOnlineUsersChange() { - log.info("手动触发在线用户数量通知,当前在线用户数:{}", onlineUsers.size()); - sendOnlineUserCount(); + log.info("手动触发在线用户数量通知,当前在线用户数:{}", getOnlineUserCount()); + broadcastOnlineUserCount(); } - + /** - * 发送在线用户数量(简化版,不包含用户详情) + * 广播在线用户数量变更(内部方法) */ - private void sendOnlineUserCount() { + private void broadcastOnlineUserCount() { if (messagingTemplate == null) { log.warn("消息模板尚未初始化,无法发送在线用户数量"); return; } - + try { - // 直接发送数量,更轻量 - int count = onlineUsers.size(); + int count = getOnlineUserCount(); messagingTemplate.convertAndSend("/topic/online-count", count); - log.debug("已发送在线用户数量: {}", count); + log.debug("✓ 已广播在线用户数量: {}", count); } catch (Exception e) { - log.error("发送在线用户数量失败", e); + log.error("广播在线用户数量失败", e); } } - - /** - * 内部通用通知方法 - * 通知所有客户端在线用户变更 - */ - private void notifyOnlineUsersChangeInternal() { - if (messagingTemplate == null) { - log.warn("消息模板尚未初始化,无法发送在线用户数量通知"); - return; - } - - // 只发送简化版数据(仅数量) - sendOnlineUserCount(); - } + + // ==================== 消息推送功能 ==================== /** - * 用户在线信息 - */ - @Data - private static class UserOnlineInfo { - private final String username; - private final String sessionId; - private final long loginTime; - } - - /** - * 用户在线DTO(用于返回给前端) - */ - @Data - public static class UserOnlineDTO { - private final String username; - private final long loginTime; - } - - /** - * 在线用户变更事件 - */ - @Data - private static class OnlineUsersChangeEvent { - private String type; - private int count; - private List users; - private long timestamp; - } - - //================================== - // WebSocket消息发送功能 - //================================== - - /** - * 向所有客户端发送字典更新事件 + * 向所有客户端广播字典更新事件 * * @param dictCode 字典编码 */ @Override public void broadcastDictChange(String dictCode) { + if (dictCode == null || dictCode.isEmpty()) { + log.warn("字典编码为空,跳过广播"); + return; + } + DictEventDTO event = new DictEventDTO(dictCode); - sendDictEvent(event); + sendDictChangeEvent(event); } /** - * 发送字典事件消息 + * 发送字典变更事件 * * @param event 字典事件 */ - private void sendDictEvent(DictEventDTO event) { + private void sendDictChangeEvent(DictEventDTO event) { if (messagingTemplate == null) { log.warn("消息模板尚未初始化,无法发送字典更新通知"); return; } - + try { String message = objectMapper.writeValueAsString(event); messagingTemplate.convertAndSend("/topic/dict", message); - log.info("已发送字典事件通知, dictCode: {}", event.getDictCode()); + log.info("✓ 已广播字典变更通知: dictCode={}", event.getDictCode()); } catch (JsonProcessingException e) { - log.error("发送字典事件失败", e); + log.error("字典事件序列化失败: dictCode={}", event.getDictCode(), e); + } catch (Exception e) { + log.error("发送字典变更通知失败: dictCode={}", event.getDictCode(), e); } } /** - * 向特定用户发送系统消息 - * - * @param username 用户名 - * @param message 消息内容 + * 向特定用户发送通知消息 + * + * @param username 目标用户名 + * @param message 消息内容 */ @Override public void sendNotification(String username, Object message) { + if (username == null || username.isEmpty()) { + log.warn("用户名为空,无法发送通知"); + return; + } + + if (message == null) { + log.warn("消息内容为空,无法发送给用户[{}]", username); + return; + } + if (messagingTemplate == null) { log.warn("消息模板尚未初始化,无法发送用户消息"); return; } - + try { String messageJson = objectMapper.writeValueAsString(message); messagingTemplate.convertAndSendToUser(username, "/queue/messages", messageJson); - log.info("已向用户[{}]发送消息", username); + log.info("✓ 已向用户[{}]发送通知", username); } catch (JsonProcessingException e) { - log.error("向用户[{}]发送消息失败", username, e); + log.error("消息序列化失败: username={}", username, e); + } catch (Exception e) { + log.error("向用户[{}]发送通知失败", username, e); } } - + /** - * 发送广播消息给所有用户 - * + * 广播系统消息给所有用户 + * * @param message 消息内容 */ - public void broadcastMessage(String message) { + public void broadcastSystemMessage(String message) { + if (message == null || message.isEmpty()) { + log.warn("消息内容为空,无法广播"); + return; + } + if (messagingTemplate == null) { log.warn("消息模板尚未初始化,无法发送广播消息"); return; } - + try { - SystemMessage systemMessage = new SystemMessage("系统", message, System.currentTimeMillis()); + SystemMessage systemMessage = new SystemMessage( + "系统通知", + message, + System.currentTimeMillis() + ); String messageJson = objectMapper.writeValueAsString(systemMessage); messagingTemplate.convertAndSend("/topic/public", messageJson); - log.info("已发送广播消息: {}", message); + log.info("✓ 已广播系统消息: {}", message); } catch (JsonProcessingException e) { - log.error("发送广播消息失败", e); + log.error("系统消息序列化失败", e); + } catch (Exception e) { + log.error("广播系统消息失败", e); } } - + + // ==================== 内部数据类 ==================== + /** - * 系统消息对象 + * 会话信息 */ @Data - public static class SystemMessage { - private String sender; - private String content; - private long timestamp; - - public SystemMessage(String sender, String content, long timestamp) { - this.sender = sender; - this.content = content; - this.timestamp = timestamp; - } + @AllArgsConstructor + @NoArgsConstructor + private static class SessionInfo { + /** 用户名 */ + private String username; + /** 会话 ID */ + private String sessionId; + /** 连接时间戳 */ + private long connectTime; } -} + + /** + * 在线用户 DTO + */ + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class OnlineUserDTO { + /** 用户名 */ + private String username; + /** 会话数量 */ + private int sessionCount; + /** 首次登录时间 */ + private long loginTime; + } + + /** + * 系统消息 + */ + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class SystemMessage { + /** 发送者 */ + private String sender; + /** 消息内容 */ + private String content; + /** 时间戳 */ + private long timestamp; + } +} diff --git a/src/main/java/com/youlai/boot/system/service/impl/WebSocketSessionCleanupService.java b/src/main/java/com/youlai/boot/system/service/impl/WebSocketSessionCleanupService.java new file mode 100644 index 00000000..f1f2042b --- /dev/null +++ b/src/main/java/com/youlai/boot/system/service/impl/WebSocketSessionCleanupService.java @@ -0,0 +1,94 @@ +package com.youlai.boot.system.service.impl; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +/** + * WebSocket 会话清理服务 + * + * 功能: + * - 定时清理僵尸会话 + * - 监控会话状态 + * - 输出统计信息 + * + * @author Ray.Hao + * @since 3.0.0 + */ +@Service +@Slf4j +@RequiredArgsConstructor +@ConditionalOnProperty( + prefix = "websocket.session-cleanup", + name = "enabled", + havingValue = "true", + matchIfMissing = true +) +public class WebSocketSessionCleanupService { + + private final WebSocketServiceImpl webSocketService; + + /** + * 定时输出 WebSocket 会话统计信息 + * + * 每 5 分钟执行一次 + */ + @Scheduled(fixedRate = 300000, initialDelay = 60000) + public void logSessionStatistics() { + try { + int onlineUserCount = webSocketService.getOnlineUserCount(); + int totalSessionCount = webSocketService.getTotalSessionCount(); + + log.info("📊 WebSocket 统计 - 在线用户数: {}, 活跃会话数: {}", + onlineUserCount, totalSessionCount); + + // 详细信息(仅在有用户在线时输出) + if (onlineUserCount > 0) { + var onlineUsers = webSocketService.getOnlineUsers(); + onlineUsers.forEach(user -> { + log.debug(" - 用户[{}]: {} 个会话", user.getUsername(), user.getSessionCount()); + }); + } + } catch (Exception ex) { + log.error("❌ 输出会话统计信息失败", ex); + } + } + + /** + * 健康检查 + * + * 每 30 秒执行一次,用于监控服务状态 + */ + @Scheduled(fixedRate = 30000, initialDelay = 10000) + public void healthCheck() { + try { + int onlineUserCount = webSocketService.getOnlineUserCount(); + int sessionCount = webSocketService.getTotalSessionCount(); + + // 异常检测:如果会话数远大于用户数,可能存在会话泄漏 + if (sessionCount > onlineUserCount * 10 && onlineUserCount > 0) { + log.warn("⚠ 检测到异常:会话数({})远大于用户数({}×10),可能存在会话泄漏", + sessionCount, onlineUserCount); + } + } catch (Exception ex) { + log.error("❌ 健康检查失败", ex); + } + } + + /** + * 手动触发在线用户数广播 + * + * 可用于系统启动后的初始化或手动刷新 + */ + public void triggerOnlineCountBroadcast() { + try { + webSocketService.notifyOnlineUsersChange(); + log.info("✓ 手动触发在线用户数广播成功"); + } catch (Exception ex) { + log.error("❌ 手动触发在线用户数广播失败", ex); + } + } +} +