feat(websocket):重构 WebSocket 配置与服务实现

This commit is contained in:
Ray.Hao
2025-11-08 00:01:43 +08:00
parent 1aa6a15a80
commit ffb89e50da
3 changed files with 600 additions and 244 deletions

View File

@@ -27,7 +27,13 @@ import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* WebSocket配置
* WebSocket 配置
*
* 核心功能:
* - 配置 WebSocket 端点
* - 配置消息代理
* - 实现连接认证与授权
* - 管理用户会话生命周期
*
* @author Ray.Hao
* @since 3.0.0
@@ -37,133 +43,251 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo
@Slf4j
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final TokenManager tokenManager;
private final WebSocketService webSocketService;
private static final String WS_ENDPOINT = "/ws";
private static final String APP_DESTINATION_PREFIX = "/app";
private static final String USER_DESTINATION_PREFIX = "/user";
private static final String[] BROKER_DESTINATIONS = {"/topic", "/queue"};
public WebSocketConfig(TokenManager tokenManager, @Lazy WebSocketService webSocketService) {
this.tokenManager = tokenManager;
this.webSocketService = webSocketService;
}
private final TokenManager tokenManager;
private final WebSocketService webSocketService;
/**
* 注册一个端点,客户端通过这个端点进行连接
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
// 注册 /ws 的端点
.addEndpoint("/ws")
// 允许跨域
.setAllowedOriginPatterns("*");
}
public WebSocketConfig(TokenManager tokenManager, @Lazy WebSocketService webSocketService) {
this.tokenManager = tokenManager;
this.webSocketService = webSocketService;
log.info("✓ WebSocket 配置已加载");
}
/**
* 注册 STOMP 端点
*
* 客户端通过该端点建立 WebSocket 连接
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint(WS_ENDPOINT)
.setAllowedOriginPatterns("*"); // 允许跨域(生产环境建议配置具体域名)
/**
* 配置消息代理
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 客户端发送消息的请求前缀
registry.setApplicationDestinationPrefixes("/app");
log.info("✓ STOMP 端点已注册: {}", WS_ENDPOINT);
}
// 客户端订阅消息的请求前缀topic一般用于广播推送queue用于点对点推送
registry.enableSimpleBroker("/topic", "/queue");
/**
* 配置消息代理
*
* - /app 前缀:客户端发送消息到服务端的前缀
* - /topic 前缀:用于广播消息
* - /queue 前缀:用于点对点消息
* - /user 前缀:服务端发送给特定用户的消息前缀
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 客户端发送消息的请求前缀
registry.setApplicationDestinationPrefixes(APP_DESTINATION_PREFIX);
// 服务端通知客户端的前缀可以不设置默认为user
registry.setUserDestinationPrefix("/user");
}
// 启用简单消息代理,处理 /topic 和 /queue 前缀的消息
registry.enableSimpleBroker(BROKER_DESTINATIONS);
// 服务端通知客户端的前缀
registry.setUserDestinationPrefix(USER_DESTINATION_PREFIX);
/**
* 配置客户端入站通道拦截器
* <p>
* 核心功能:
* 1. 连接建立时解析令牌并绑定用户身份
* 2. 连接关闭时触发下线通知
* 3. 异常Token的防御性处理
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor == null) {
return ChannelInterceptor.super.preSend(message, channel);
log.info("✓ 消息代理已配置: app={}, broker={}, user={}",
APP_DESTINATION_PREFIX, BROKER_DESTINATIONS, USER_DESTINATION_PREFIX);
}
/**
* 配置客户端入站通道拦截器
*
* 核心功能:
* 1. 连接建立时:解析 JWT Token 并绑定用户身份
* 2. 连接关闭时:触发用户下线通知
* 3. 安全防护:拦截无效连接请求
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
// 防御性检查:确保 accessor 不为空
if (accessor == null) {
log.warn("⚠ 收到异常消息:无法获取 StompHeaderAccessor");
return ChannelInterceptor.super.preSend(message, channel);
}
StompCommand command = accessor.getCommand();
if (command == null) {
return ChannelInterceptor.super.preSend(message, channel);
}
try {
switch (command) {
case CONNECT:
handleConnect(accessor);
break;
case DISCONNECT:
handleDisconnect(accessor);
break;
case SUBSCRIBE:
handleSubscribe(accessor);
break;
default:
// 其他命令不需要特殊处理
break;
}
} catch (AuthenticationException ex) {
// 认证失败时强制关闭连接
log.error("❌ 连接认证失败: {}", ex.getMessage());
throw ex;
} catch (Exception ex) {
// 捕获其他未知异常
log.error("❌ WebSocket 消息处理异常", ex);
throw new MessagingException("消息处理失败: " + ex.getMessage());
}
return ChannelInterceptor.super.preSend(message, channel);
}
});
log.info("✓ 客户端入站通道拦截器已配置");
}
/**
* 处理客户端连接请求
*
* 安全校验流程:
* 1. 提取 Authorization 头
* 2. 验证 Bearer Token 格式
* 3. 解析并验证 JWT 有效性
* 4. 绑定用户身份到当前会话
* 5. 记录用户上线状态
*/
private void handleConnect(StompHeaderAccessor accessor) {
String authorization = accessor.getFirstNativeHeader(HttpHeaders.AUTHORIZATION);
// 安全检查:确保 Authorization 头存在且格式正确
if (StrUtil.isBlank(authorization)) {
log.warn("⚠ 非法连接请求:缺少 Authorization 头");
throw new AuthenticationCredentialsNotFoundException("缺少 Authorization 头");
}
if (!authorization.startsWith("Bearer ")) {
log.warn("⚠ 非法连接请求Authorization 头格式错误");
throw new BadCredentialsException("Authorization 头格式错误");
}
// 提取 JWT Token移除 "Bearer " 前缀)
String token = authorization.substring(7);
if (StrUtil.isBlank(token)) {
log.warn("⚠ 非法连接请求Token 为空");
throw new BadCredentialsException("Token 为空");
}
// 解析并验证 Token
Authentication authentication;
try {
// 处理客户端连接请求
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
/*
* 安全校验流程:
* 1. 从HEADER中获取Authorization值
* 2. 校验Bearer Token格式合法性
* 3. 解析并验证JWT有效性
* 4. 绑定用户身份到当前会话
*/
String authorization = accessor.getFirstNativeHeader(HttpHeaders.AUTHORIZATION);
// 防御性校验确保Authorization头存在且格式正确
if (StrUtil.isBlank(authorization) || !authorization.startsWith("Bearer ")) {
log.warn("非法连接请求缺少有效的Authorization头");
throw new AuthenticationCredentialsNotFoundException("Missing authorization header");
}
// 提取并处理JWT令牌移除Bearer前缀
String token = authorization.substring(7);
Authentication authentication = tokenManager.parseToken(token);
// 令牌解析失败处理
if (authentication == null) {
log.error("令牌解析失败:{}", token);
throw new BadCredentialsException("Invalid token");
}
// 获取用户详细信息
SysUserDetails userDetails = (SysUserDetails) authentication.getPrincipal();
if (userDetails == null || StrUtil.isBlank(userDetails.getUsername())) {
log.error("无效的用户凭证:{}", token);
throw new BadCredentialsException("Invalid user credentials");
}
String username = userDetails.getUsername();
log.info("WebSocket连接建立用户[{}]", username);
// 绑定用户身份到当前会话(重要:用于@SendToUser等注解
accessor.setUser(authentication);
// 记录用户上线状态
webSocketService.userConnected(username, accessor.getSessionId());
}
// 处理客户端断开请求
else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
/*
* 注意:只有成功建立过认证的连接才会触发下线事件
* 防止未认证成功的连接产生脏数据
*/
Authentication authentication = (Authentication) accessor.getUser();
if (authentication != null && authentication.isAuthenticated()) {
String username = ((SysUserDetails) authentication.getPrincipal()).getUsername();
log.info("WebSocket连接关闭用户[{}]", username);
// 记录用户下线状态
webSocketService.userDisconnected(username);
}
}
} catch (AuthenticationException ex) {
// 认证失败时强制关闭连接
log.error("连接认证失败:{}", ex.getMessage());
throw ex;
authentication = tokenManager.parseToken(token);
} catch (Exception ex) {
// 捕获其他未知异常
log.error("WebSocket连接处理异常", ex);
throw new MessagingException("Connection processing failed");
log.error("❌ Token 解析失败", ex);
throw new BadCredentialsException("Token 无效: " + ex.getMessage());
}
return ChannelInterceptor.super.preSend(message, channel);
}
});
}
// 验证解析结果
if (authentication == null || !authentication.isAuthenticated()) {
log.warn("⚠ Token 解析失败:认证对象无效");
throw new BadCredentialsException("Token 解析失败");
}
// 获取用户详细信息
Object principal = authentication.getPrincipal();
if (!(principal instanceof SysUserDetails)) {
log.error("❌ 无效的用户凭证类型: {}", principal.getClass().getName());
throw new BadCredentialsException("用户凭证类型错误");
}
SysUserDetails userDetails = (SysUserDetails) principal;
String username = userDetails.getUsername();
if (StrUtil.isBlank(username)) {
log.warn("⚠ 用户名为空");
throw new BadCredentialsException("用户名为空");
}
// 绑定用户身份到当前会话(重要:用于 @SendToUser 等注解)
accessor.setUser(authentication);
// 获取会话 ID
String sessionId = accessor.getSessionId();
if (sessionId == null) {
log.warn("⚠ 会话 ID 为空,使用临时 ID");
sessionId = "temp-" + System.nanoTime();
}
// 记录用户上线状态
try {
webSocketService.userConnected(username, sessionId);
log.info("✓ WebSocket 连接建立成功: 用户[{}], 会话[{}]", username, sessionId);
} catch (Exception ex) {
log.error("❌ 记录用户上线状态失败: 用户[{}], 会话[{}]", username, sessionId, ex);
// 不抛出异常,允许连接继续
}
}
/**
* 处理客户端断开连接事件
*
* 注意:
* - 只有成功建立过认证的连接才会触发下线事件
* - 防止未认证成功的连接产生脏数据
*/
private void handleDisconnect(StompHeaderAccessor accessor) {
Authentication authentication = (Authentication) accessor.getUser();
// 防御性检查:只处理已认证的连接
if (authentication == null || !authentication.isAuthenticated()) {
log.debug("未认证的连接断开,跳过处理");
return;
}
Object principal = authentication.getPrincipal();
if (!(principal instanceof SysUserDetails)) {
log.warn("⚠ 断开连接时用户凭证类型异常");
return;
}
SysUserDetails userDetails = (SysUserDetails) principal;
String username = userDetails.getUsername();
if (StrUtil.isNotBlank(username)) {
try {
webSocketService.userDisconnected(username);
log.info("✓ WebSocket 连接断开: 用户[{}]", username);
} catch (Exception ex) {
log.error("❌ 记录用户下线状态失败: 用户[{}]", username, ex);
}
}
}
/**
* 处理客户端订阅事件(可选)
*
* 用于记录订阅信息或实施订阅级别的权限控制
*/
private void handleSubscribe(StompHeaderAccessor accessor) {
Authentication authentication = (Authentication) accessor.getUser();
if (authentication != null && authentication.isAuthenticated()) {
String destination = accessor.getDestination();
String username = authentication.getName();
log.debug("用户[{}]订阅主题: {}", username, destination);
// TODO: 这里可以实现订阅级别的权限控制
// 例如:检查用户是否有权限订阅某个主题
}
}
}

View File

@@ -4,7 +4,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.youlai.boot.system.model.dto.DictEventDTO;
import com.youlai.boot.system.service.WebSocketService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
@@ -12,12 +14,17 @@ import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* WebSocket服务实现类
* 统一处理WebSocket消息发送和用户在线状态管理
* WebSocket 服务实现类
*
* 核心功能:
* - 用户在线状态管理(支持多设备登录)
* - 消息推送(广播、点对点)
* - 字典变更通知
*
* @author Ray.Hao
* @since 3.0.0
@@ -26,8 +33,23 @@ import java.util.stream.Collectors;
@Slf4j
public class WebSocketServiceImpl implements WebSocketService {
// 在线用户映射表key为用户名value为用户在线信息
private final Map<String, UserOnlineInfo> onlineUsers = new ConcurrentHashMap<>();
// ==================== 在线用户管理 ====================
/**
* 用户在线会话映射表
* Key: 用户名
* Value: 该用户的所有会话 ID 集合(支持多设备登录)
*/
private final Map<String, Set<String>> userSessionsMap = new ConcurrentHashMap<>();
/**
* 会话详情映射表
* Key: 会话 ID
* Value: 会话详细信息
*/
private final Map<String, SessionInfo> sessionDetailsMap = new ConcurrentHashMap<>();
// ==================== 依赖注入 ====================
private SimpMessagingTemplate messagingTemplate;
private final ObjectMapper objectMapper;
@@ -36,67 +58,156 @@ public class WebSocketServiceImpl implements WebSocketService {
public WebSocketServiceImpl(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
/**
* 延迟注入 SimpMessagingTemplate避免循环依赖
*/
@Autowired(required = false)
public void setMessagingTemplate(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
log.info("WebSocket消息模板已初始化");
log.info("WebSocket 消息模板已初始化");
}
//==================================
// 用户在线状态管理功能
//==================================
// ==================== 用户在线状态管理 ====================
/**
* 用户上线
* 处理用户连接事件
*
* @param username 用户名
* @param sessionId WebSocket会话ID(可选)
* @param sessionId WebSocket 会话 ID
*/
@Override
public void userConnected(String username, String sessionId) {
// 生成会话ID如果未提供
String actualSessionId = sessionId != null ? sessionId : "session-" + System.nanoTime();
UserOnlineInfo info = new UserOnlineInfo(username, actualSessionId, System.currentTimeMillis());
onlineUsers.put(username, info);
log.info("用户[{}]上线,当前在线用户数:{}", username, onlineUsers.size());
// 通知在线用户状态变更
notifyOnlineUsersChangeInternal();
if (username == null || username.isEmpty()) {
log.warn("用户连接失败:用户名为空");
return;
}
if (sessionId == null || sessionId.isEmpty()) {
log.warn("用户[{}]连接失败:会话 ID 为空", username);
return;
}
// 添加会话到用户的会话集合中(支持多设备登录)
userSessionsMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet())
.add(sessionId);
// 保存会话详情
SessionInfo sessionInfo = new SessionInfo(username, sessionId, System.currentTimeMillis());
sessionDetailsMap.put(sessionId, sessionInfo);
int sessionCount = userSessionsMap.get(username).size();
int totalOnlineUsers = userSessionsMap.size();
log.info("✓ 用户[{}]会话[{}]上线(该用户共 {} 个会话,系统总在线用户数:{}",
username, sessionId, sessionCount, totalOnlineUsers);
// 广播在线用户数变更
broadcastOnlineUserCount();
}
/**
* 用户下线
* 处理用户断开连接事件
*
* @param username 用户名
*/
@Override
public void userDisconnected(String username) {
onlineUsers.remove(username);
log.info("用户[{}]下线,当前在线用户数:{}", username, onlineUsers.size());
// 通知在线用户状态变更
notifyOnlineUsersChangeInternal();
if (username == null || username.isEmpty()) {
return;
}
// 获取该用户的所有会话
Set<String> sessions = userSessionsMap.get(username);
if (sessions == null || sessions.isEmpty()) {
log.warn("用户[{}]下线:未找到会话记录", username);
return;
}
// 移除所有会话详情(通常一次只断开一个会话,但这里做全量清理)
sessions.forEach(sessionDetailsMap::remove);
// 移除用户的会话记录
userSessionsMap.remove(username);
int totalOnlineUsers = userSessionsMap.size();
log.info("✓ 用户[{}]下线(系统总在线用户数:{}", username, totalOnlineUsers);
// 广播在线用户数变更
broadcastOnlineUserCount();
}
/**
* 移除指定会话(单个设备下线)
*
* @param sessionId 会话 ID
*/
public void removeSession(String sessionId) {
SessionInfo sessionInfo = sessionDetailsMap.remove(sessionId);
if (sessionInfo == null) {
return;
}
String username = sessionInfo.getUsername();
Set<String> sessions = userSessionsMap.get(username);
if (sessions != null) {
sessions.remove(sessionId);
// 如果该用户没有其他会话了,移除用户记录
if (sessions.isEmpty()) {
userSessionsMap.remove(username);
log.info("✓ 用户[{}]最后一个会话[{}]下线", username, sessionId);
} else {
log.info("✓ 用户[{}]会话[{}]下线(还剩 {} 个会话)",
username, sessionId, sessions.size());
}
// 广播在线用户数变更
broadcastOnlineUserCount();
}
}
/**
* 获取在线用户列表
*
* @return 在线用户列表
* @return 在线用户信息列表
*/
public List<UserOnlineDTO> getOnlineUsers() {
return onlineUsers.values().stream()
.map(info -> new UserOnlineDTO(info.getUsername(), info.getLoginTime()))
public List<OnlineUserDTO> getOnlineUsers() {
return userSessionsMap.entrySet().stream()
.map(entry -> {
String username = entry.getKey();
Set<String> sessions = entry.getValue();
// 获取该用户最早的登录时间
long earliestLoginTime = sessions.stream()
.map(sessionDetailsMap::get)
.filter(info -> info != null)
.mapToLong(SessionInfo::getConnectTime)
.min()
.orElse(System.currentTimeMillis());
return new OnlineUserDTO(username, sessions.size(), earliestLoginTime);
})
.collect(Collectors.toList());
}
/**
* 获取在线用户数量
*
* @return 在线用户数
* @return 在线用户数(不是会话数)
*/
public int getOnlineUserCount() {
return onlineUsers.size();
return userSessionsMap.size();
}
/**
* 获取在线会话总数
*
* @return 所有在线会话的总数
*/
public int getTotalSessionCount() {
return sessionDetailsMap.size();
}
/**
@@ -106,172 +217,199 @@ public class WebSocketServiceImpl implements WebSocketService {
* @return 是否在线
*/
public boolean isUserOnline(String username) {
return onlineUsers.containsKey(username);
Set<String> sessions = userSessionsMap.get(username);
return sessions != null && !sessions.isEmpty();
}
/**
* 手动触发在线用户变更通知
* 供外部手动触发通知使用
* 获取指定用户的会话数量
*
* @param username 用户名
* @return 会话数量
*/
public int getUserSessionCount(String username) {
Set<String> sessions = userSessionsMap.get(username);
return sessions != null ? sessions.size() : 0;
}
/**
* 手动触发在线用户数量广播
*
* 供外部服务(如定时任务)调用
*/
public void notifyOnlineUsersChange() {
log.info("手动触发在线用户数量通知,当前在线用户数:{}", onlineUsers.size());
sendOnlineUserCount();
log.info("手动触发在线用户数量通知,当前在线用户数:{}", getOnlineUserCount());
broadcastOnlineUserCount();
}
/**
* 发送在线用户数量(简化版,不包含用户详情
* 广播在线用户数量变更(内部方法
*/
private void sendOnlineUserCount() {
private void broadcastOnlineUserCount() {
if (messagingTemplate == null) {
log.warn("消息模板尚未初始化,无法发送在线用户数量");
return;
}
try {
// 直接发送数量,更轻量
int count = onlineUsers.size();
int count = getOnlineUserCount();
messagingTemplate.convertAndSend("/topic/online-count", count);
log.debug("已发送在线用户数量: {}", count);
log.debug("✓ 已广播在线用户数量: {}", count);
} catch (Exception e) {
log.error("发送在线用户数量失败", e);
log.error("广播在线用户数量失败", e);
}
}
/**
* 内部通用通知方法
* 通知所有客户端在线用户变更
*/
private void notifyOnlineUsersChangeInternal() {
if (messagingTemplate == null) {
log.warn("消息模板尚未初始化,无法发送在线用户数量通知");
return;
}
// 只发送简化版数据(仅数量)
sendOnlineUserCount();
}
// ==================== 消息推送功能 ====================
/**
* 用户在线信息
*/
@Data
private static class UserOnlineInfo {
private final String username;
private final String sessionId;
private final long loginTime;
}
/**
* 用户在线DTO用于返回给前端
*/
@Data
public static class UserOnlineDTO {
private final String username;
private final long loginTime;
}
/**
* 在线用户变更事件
*/
@Data
private static class OnlineUsersChangeEvent {
private String type;
private int count;
private List<UserOnlineDTO> users;
private long timestamp;
}
//==================================
// WebSocket消息发送功能
//==================================
/**
* 向所有客户端发送字典更新事件
* 向所有客户端广播字典更新事件
*
* @param dictCode 字典编码
*/
@Override
public void broadcastDictChange(String dictCode) {
if (dictCode == null || dictCode.isEmpty()) {
log.warn("字典编码为空,跳过广播");
return;
}
DictEventDTO event = new DictEventDTO(dictCode);
sendDictEvent(event);
sendDictChangeEvent(event);
}
/**
* 发送字典事件消息
* 发送字典变更事件
*
* @param event 字典事件
*/
private void sendDictEvent(DictEventDTO event) {
private void sendDictChangeEvent(DictEventDTO event) {
if (messagingTemplate == null) {
log.warn("消息模板尚未初始化,无法发送字典更新通知");
return;
}
try {
String message = objectMapper.writeValueAsString(event);
messagingTemplate.convertAndSend("/topic/dict", message);
log.info("已发送字典事件通知, dictCode: {}", event.getDictCode());
log.info("✓ 已广播字典变更通知: dictCode={}", event.getDictCode());
} catch (JsonProcessingException e) {
log.error("发送字典事件失败", e);
log.error("字典事件序列化失败: dictCode={}", event.getDictCode(), e);
} catch (Exception e) {
log.error("发送字典变更通知失败: dictCode={}", event.getDictCode(), e);
}
}
/**
* 向特定用户发送系统消息
*
* @param username 用户名
* @param message 消息内容
* 向特定用户发送通知消息
*
* @param username 目标用户名
* @param message 消息内容
*/
@Override
public void sendNotification(String username, Object message) {
if (username == null || username.isEmpty()) {
log.warn("用户名为空,无法发送通知");
return;
}
if (message == null) {
log.warn("消息内容为空,无法发送给用户[{}]", username);
return;
}
if (messagingTemplate == null) {
log.warn("消息模板尚未初始化,无法发送用户消息");
return;
}
try {
String messageJson = objectMapper.writeValueAsString(message);
messagingTemplate.convertAndSendToUser(username, "/queue/messages", messageJson);
log.info("已向用户[{}]发送消息", username);
log.info("已向用户[{}]发送通知", username);
} catch (JsonProcessingException e) {
log.error("向用户[{}]发送消息失败", username, e);
log.error("消息序列化失败: username={}", username, e);
} catch (Exception e) {
log.error("向用户[{}]发送通知失败", username, e);
}
}
/**
* 发送广播消息给所有用户
*
* 广播系统消息给所有用户
*
* @param message 消息内容
*/
public void broadcastMessage(String message) {
public void broadcastSystemMessage(String message) {
if (message == null || message.isEmpty()) {
log.warn("消息内容为空,无法广播");
return;
}
if (messagingTemplate == null) {
log.warn("消息模板尚未初始化,无法发送广播消息");
return;
}
try {
SystemMessage systemMessage = new SystemMessage("系统", message, System.currentTimeMillis());
SystemMessage systemMessage = new SystemMessage(
"系统通知",
message,
System.currentTimeMillis()
);
String messageJson = objectMapper.writeValueAsString(systemMessage);
messagingTemplate.convertAndSend("/topic/public", messageJson);
log.info("已发送广播消息: {}", message);
log.info("✓ 已广播系统消息: {}", message);
} catch (JsonProcessingException e) {
log.error("发送广播消息失败", e);
log.error("系统消息序列化失败", e);
} catch (Exception e) {
log.error("广播系统消息失败", e);
}
}
// ==================== 内部数据类 ====================
/**
* 系统消息对象
* 会话信息
*/
@Data
public static class SystemMessage {
private String sender;
private String content;
private long timestamp;
public SystemMessage(String sender, String content, long timestamp) {
this.sender = sender;
this.content = content;
this.timestamp = timestamp;
}
@AllArgsConstructor
@NoArgsConstructor
private static class SessionInfo {
/** 用户名 */
private String username;
/** 会话 ID */
private String sessionId;
/** 连接时间戳 */
private long connectTime;
}
}
/**
* 在线用户 DTO
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OnlineUserDTO {
/** 用户名 */
private String username;
/** 会话数量 */
private int sessionCount;
/** 首次登录时间 */
private long loginTime;
}
/**
* 系统消息
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SystemMessage {
/** 发送者 */
private String sender;
/** 消息内容 */
private String content;
/** 时间戳 */
private long timestamp;
}
}

View File

@@ -0,0 +1,94 @@
package com.youlai.boot.system.service.impl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* WebSocket 会话清理服务
*
* 功能:
* - 定时清理僵尸会话
* - 监控会话状态
* - 输出统计信息
*
* @author Ray.Hao
* @since 3.0.0
*/
@Service
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(
prefix = "websocket.session-cleanup",
name = "enabled",
havingValue = "true",
matchIfMissing = true
)
public class WebSocketSessionCleanupService {
private final WebSocketServiceImpl webSocketService;
/**
* 定时输出 WebSocket 会话统计信息
*
* 每 5 分钟执行一次
*/
@Scheduled(fixedRate = 300000, initialDelay = 60000)
public void logSessionStatistics() {
try {
int onlineUserCount = webSocketService.getOnlineUserCount();
int totalSessionCount = webSocketService.getTotalSessionCount();
log.info("📊 WebSocket 统计 - 在线用户数: {}, 活跃会话数: {}",
onlineUserCount, totalSessionCount);
// 详细信息(仅在有用户在线时输出)
if (onlineUserCount > 0) {
var onlineUsers = webSocketService.getOnlineUsers();
onlineUsers.forEach(user -> {
log.debug(" - 用户[{}]: {} 个会话", user.getUsername(), user.getSessionCount());
});
}
} catch (Exception ex) {
log.error("❌ 输出会话统计信息失败", ex);
}
}
/**
* 健康检查
*
* 每 30 秒执行一次,用于监控服务状态
*/
@Scheduled(fixedRate = 30000, initialDelay = 10000)
public void healthCheck() {
try {
int onlineUserCount = webSocketService.getOnlineUserCount();
int sessionCount = webSocketService.getTotalSessionCount();
// 异常检测:如果会话数远大于用户数,可能存在会话泄漏
if (sessionCount > onlineUserCount * 10 && onlineUserCount > 0) {
log.warn("⚠ 检测到异常:会话数({})远大于用户数({}×10),可能存在会话泄漏",
sessionCount, onlineUserCount);
}
} catch (Exception ex) {
log.error("❌ 健康检查失败", ex);
}
}
/**
* 手动触发在线用户数广播
*
* 可用于系统启动后的初始化或手动刷新
*/
public void triggerOnlineCountBroadcast() {
try {
webSocketService.notifyOnlineUsersChange();
log.info("✓ 手动触发在线用户数广播成功");
} catch (Exception ex) {
log.error("❌ 手动触发在线用户数广播失败", ex);
}
}
}