feat: WebSocket 迁移到 SSE 实现实时推送
This commit is contained in:
60
CHANGELOG.md
60
CHANGELOG.md
@@ -1,60 +0,0 @@
|
|||||||
# 2.7.1 (2024/4/18)
|
|
||||||
### 🐛 fix
|
|
||||||
- 修复用户名或者密码错误时,返回的错误信息不正确问题
|
|
||||||
### 🛠️ refactor
|
|
||||||
- JWT 解析和验证代码优化重构
|
|
||||||
- 优化代码结构和完善注释,提高代码可读性
|
|
||||||
|
|
||||||
# 2.7.0 (2024/4/13)
|
|
||||||
### ✨ feat
|
|
||||||
- 集成 Mybatis-Plus generator 代码生成器
|
|
||||||
|
|
||||||
# 2.6.0 (2024/3/6)
|
|
||||||
|
|
||||||
### ✨ feat
|
|
||||||
- 黑名单方式实现 JWT 主动注销过期
|
|
||||||
### 🛠️ refactor
|
|
||||||
- 角色权限重构
|
|
||||||
|
|
||||||
|
|
||||||
# 2.5.0 (2023/12/6)
|
|
||||||
### ✨ feat
|
|
||||||
- [集成 Spring Cache 和 Redis 缓存,路由缓存](https://blog.csdn.net/u013737132/article/details/134789862)
|
|
||||||
### 🛠️ refactor
|
|
||||||
- 权限判断逻辑调整,用户绑定权限调整为角色绑定权限
|
|
||||||
### fix
|
|
||||||
- [接口无请求权限,Spring Security 自定义异常无效问题修复](https://youlai.blog.csdn.net/article/details/134718249)
|
|
||||||
|
|
||||||
|
|
||||||
# 2.4.1 (2023/11/7)
|
|
||||||
### ✂️ refactor
|
|
||||||
- 项目目录结构优化
|
|
||||||
### ⬆️ chore
|
|
||||||
- 升级 SpringBoot 版本 `3.1.4` → `3.1.5`
|
|
||||||
|
|
||||||
|
|
||||||
# 2.2.1 (2023/5/25)
|
|
||||||
|
|
||||||
### 🐛 fix
|
|
||||||
|
|
||||||
- 修复多级路由的组件路径错误导致页面404问题
|
|
||||||
|
|
||||||
# 2.2.0 (2023/5/21)
|
|
||||||
|
|
||||||
### ✨ feat
|
|
||||||
- 菜单、角色、字典、部门添加接口权限控制
|
|
||||||
|
|
||||||
### 🐛 fix
|
|
||||||
|
|
||||||
- 用户登录权限缓存键值不一致导致获取用户数据权限错误问题修复
|
|
||||||
|
|
||||||
### ✂️ refactor
|
|
||||||
|
|
||||||
- 递归获取菜单、部门属性列表代码重构优化
|
|
||||||
|
|
||||||
### ⬆️ chore
|
|
||||||
- 升级 SpringBoot 版本 `3.0.6` → `3.1.0`
|
|
||||||
|
|
||||||
### 📝 docs
|
|
||||||
- SQL 脚本更新,sys_menu 新增 `tree_path` 字段 (升级需更新SQL脚本)
|
|
||||||
|
|
||||||
17
README.md
17
README.md
@@ -49,7 +49,7 @@
|
|||||||
| [vue3-element-template](https://gitee.com/youlaiorg/vue3-element-template) | Vue 3 + Element Plus | 前端精简模板 |
|
| [vue3-element-template](https://gitee.com/youlaiorg/vue3-element-template) | Vue 3 + Element Plus | 前端精简模板 |
|
||||||
| [youlai-boot-tenant](https://gitee.com/youlaiorg/youlai-boot-tenant) | Spring Boot 4 | 多租户 SaaS 版 |
|
| [youlai-boot-tenant](https://gitee.com/youlaiorg/youlai-boot-tenant) | Spring Boot 4 | 多租户 SaaS 版 |
|
||||||
| [youlai-boot-flex](https://gitee.com/youlaiorg/youlai-boot-flex) | Spring Boot 3 + MyBatis-Flex | MyBatis-Flex 版 |
|
| [youlai-boot-flex](https://gitee.com/youlaiorg/youlai-boot-flex) | Spring Boot 3 + MyBatis-Flex | MyBatis-Flex 版 |
|
||||||
| [youlai-uniapp](https://gitee.com/youlaiorg/youlai-uniapp) | Vue 3 + uni-app | 移动端应用 |
|
| [youlai-app](https://gitee.com/youlaiorg/youlai-app) | Vue 3 + uni-app | 移动端应用 |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -167,17 +167,4 @@ docker-compose up -d
|
|||||||
<img src="https://foruda.gitee.com/images/1737108820762592766/3390ed0d_716974.png" width="280">
|
<img src="https://foruda.gitee.com/images/1737108820762592766/3390ed0d_716974.png" width="280">
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
**微信交流**:添加 **`haoxianrui`**,备注「前端/后端/全栈」
|
> 二维码过期?添加微信 **`haoxianrui`**,备注「前端/后端/全栈」即可拉你入群。
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
如果项目对你有帮助,欢迎 ⭐️ Star 支持!
|
|
||||||
|
|
||||||
<div align="center">
|
|
||||||
<a href="https://gitee.com/youlaiorg/youlai-boot"><b>⭐ Gitee</b></a> •
|
|
||||||
<a href="https://github.com/haoxianrui/youlai-boot"><b>⭐ GitHub</b></a> •
|
|
||||||
<a href="https://atomgit.com/youlai/youlai-boot"><b>⭐ AtomGit</b></a>
|
|
||||||
<br/>
|
|
||||||
<a href="https://www.youlai.tech"><b>🌐 官网</b></a> •
|
|
||||||
<a href="https://youlai.blog.csdn.net/"><b>📝 博客</b></a>
|
|
||||||
</div>
|
|
||||||
|
|||||||
@@ -1,293 +0,0 @@
|
|||||||
package com.youlai.boot.config;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import com.youlai.boot.security.model.SysUserDetails;
|
|
||||||
import com.youlai.boot.security.token.TokenManager;
|
|
||||||
import com.youlai.boot.support.websocket.service.WebSocketService;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.context.annotation.Lazy;
|
|
||||||
import org.springframework.http.HttpHeaders;
|
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
import org.springframework.messaging.MessageChannel;
|
|
||||||
import org.springframework.messaging.MessagingException;
|
|
||||||
import org.springframework.messaging.simp.config.ChannelRegistration;
|
|
||||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
|
||||||
import org.springframework.messaging.simp.stomp.StompCommand;
|
|
||||||
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
|
|
||||||
import org.springframework.messaging.support.ChannelInterceptor;
|
|
||||||
import org.springframework.messaging.support.MessageHeaderAccessor;
|
|
||||||
import org.springframework.security.authentication.AuthenticationCredentialsNotFoundException;
|
|
||||||
import org.springframework.security.authentication.BadCredentialsException;
|
|
||||||
import org.springframework.security.core.Authentication;
|
|
||||||
import org.springframework.security.core.AuthenticationException;
|
|
||||||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
|
|
||||||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
|
|
||||||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* WebSocket 配置类
|
|
||||||
*
|
|
||||||
* 核心功能:
|
|
||||||
* - 配置 WebSocket 端点
|
|
||||||
* - 配置消息代理
|
|
||||||
* - 实现连接认证与授权
|
|
||||||
* - 管理用户会话生命周期
|
|
||||||
*
|
|
||||||
* @author Ray.Hao
|
|
||||||
* @since 3.0.0
|
|
||||||
*/
|
|
||||||
@EnableWebSocketMessageBroker
|
|
||||||
@Configuration
|
|
||||||
@Slf4j
|
|
||||||
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
|
|
||||||
|
|
||||||
private static final String WS_ENDPOINT = "/ws";
|
|
||||||
private static final String APP_DESTINATION_PREFIX = "/app";
|
|
||||||
private static final String USER_DESTINATION_PREFIX = "/user";
|
|
||||||
private static final String[] BROKER_DESTINATIONS = {"/topic", "/queue"};
|
|
||||||
|
|
||||||
private final TokenManager tokenManager;
|
|
||||||
private final WebSocketService webSocketService;
|
|
||||||
|
|
||||||
public WebSocketConfig(TokenManager tokenManager, @Lazy WebSocketService webSocketService) {
|
|
||||||
this.tokenManager = tokenManager;
|
|
||||||
this.webSocketService = webSocketService;
|
|
||||||
log.info("✓ WebSocket 配置已加载");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 注册 STOMP 端点
|
|
||||||
*
|
|
||||||
* 客户端通过该端点建立 WebSocket 连接
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void registerStompEndpoints(StompEndpointRegistry registry) {
|
|
||||||
registry
|
|
||||||
.addEndpoint(WS_ENDPOINT)
|
|
||||||
.setAllowedOriginPatterns("*"); // 允许跨域(生产环境建议配置具体域名)
|
|
||||||
|
|
||||||
log.info("✓ STOMP 端点已注册: {}", WS_ENDPOINT);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 配置消息代理
|
|
||||||
*
|
|
||||||
* - /app 前缀:客户端发送消息到服务端的前缀
|
|
||||||
* - /topic 前缀:用于广播消息
|
|
||||||
* - /queue 前缀:用于点对点消息
|
|
||||||
* - /user 前缀:服务端发送给特定用户的消息前缀
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void configureMessageBroker(MessageBrokerRegistry registry) {
|
|
||||||
// 客户端发送消息的请求前缀
|
|
||||||
registry.setApplicationDestinationPrefixes(APP_DESTINATION_PREFIX);
|
|
||||||
|
|
||||||
// 启用简单消息代理,处理 /topic 和 /queue 前缀的消息
|
|
||||||
registry.enableSimpleBroker(BROKER_DESTINATIONS);
|
|
||||||
|
|
||||||
// 服务端通知客户端的前缀
|
|
||||||
registry.setUserDestinationPrefix(USER_DESTINATION_PREFIX);
|
|
||||||
|
|
||||||
log.info("✓ 消息代理已配置: app={}, broker={}, user={}",
|
|
||||||
APP_DESTINATION_PREFIX, BROKER_DESTINATIONS, USER_DESTINATION_PREFIX);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 配置客户端入站通道拦截器
|
|
||||||
*
|
|
||||||
* 核心功能:
|
|
||||||
* 1. 连接建立时:解析 JWT Token 并绑定用户身份
|
|
||||||
* 2. 连接关闭时:触发用户下线通知
|
|
||||||
* 3. 安全防护:拦截无效连接请求
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void configureClientInboundChannel(ChannelRegistration registration) {
|
|
||||||
registration.interceptors(new ChannelInterceptor() {
|
|
||||||
@Override
|
|
||||||
public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel) {
|
|
||||||
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
|
||||||
|
|
||||||
// 防御性检查:确保 accessor 不为空
|
|
||||||
if (accessor == null) {
|
|
||||||
log.warn("⚠ 收到异常消息:无法获取 StompHeaderAccessor");
|
|
||||||
return ChannelInterceptor.super.preSend(message, channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
StompCommand command = accessor.getCommand();
|
|
||||||
if (command == null) {
|
|
||||||
return ChannelInterceptor.super.preSend(message, channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
switch (command) {
|
|
||||||
case CONNECT:
|
|
||||||
handleConnect(accessor);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case DISCONNECT:
|
|
||||||
handleDisconnect(accessor);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SUBSCRIBE:
|
|
||||||
handleSubscribe(accessor);
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
// 其他命令不需要特殊处理
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (AuthenticationException ex) {
|
|
||||||
// 认证失败时强制关闭连接
|
|
||||||
log.error("❌ 连接认证失败: {}", ex.getMessage());
|
|
||||||
throw ex;
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// 捕获其他未知异常
|
|
||||||
log.error("❌ WebSocket 消息处理异常", ex);
|
|
||||||
throw new MessagingException("消息处理失败: " + ex.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
return ChannelInterceptor.super.preSend(message, channel);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
log.info("✓ 客户端入站通道拦截器已配置");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理客户端连接请求
|
|
||||||
*
|
|
||||||
* 安全校验流程:
|
|
||||||
* 1. 提取 Authorization 头
|
|
||||||
* 2. 验证 Bearer Token 格式
|
|
||||||
* 3. 解析并验证 JWT 有效性
|
|
||||||
* 4. 绑定用户身份到当前会话
|
|
||||||
* 5. 记录用户上线状态
|
|
||||||
*/
|
|
||||||
private void handleConnect(StompHeaderAccessor accessor) {
|
|
||||||
String authorization = accessor.getFirstNativeHeader(HttpHeaders.AUTHORIZATION);
|
|
||||||
|
|
||||||
// 安全检查:确保 Authorization 头存在且格式正确
|
|
||||||
if (StrUtil.isBlank(authorization)) {
|
|
||||||
log.warn("⚠ 非法连接请求:缺少 Authorization 头");
|
|
||||||
throw new AuthenticationCredentialsNotFoundException("缺少 Authorization 头");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!authorization.startsWith("Bearer ")) {
|
|
||||||
log.warn("⚠ 非法连接请求:Authorization 头格式错误");
|
|
||||||
throw new BadCredentialsException("Authorization 头格式错误");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 提取 JWT Token(移除 "Bearer " 前缀)
|
|
||||||
String token = authorization.substring(7);
|
|
||||||
|
|
||||||
if (StrUtil.isBlank(token)) {
|
|
||||||
log.warn("⚠ 非法连接请求:Token 为空");
|
|
||||||
throw new BadCredentialsException("Token 为空");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析并验证 Token
|
|
||||||
Authentication authentication;
|
|
||||||
try {
|
|
||||||
authentication = tokenManager.parseToken(token);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
log.error("❌ Token 解析失败", ex);
|
|
||||||
throw new BadCredentialsException("Token 无效: " + ex.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
// 验证解析结果
|
|
||||||
if (authentication == null || !authentication.isAuthenticated()) {
|
|
||||||
log.warn("⚠ Token 解析失败:认证对象无效");
|
|
||||||
throw new BadCredentialsException("Token 解析失败");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取用户详细信息
|
|
||||||
Object principal = authentication.getPrincipal();
|
|
||||||
if (!(principal instanceof SysUserDetails)) {
|
|
||||||
log.error("❌ 无效的用户凭证类型: {}", principal.getClass().getName());
|
|
||||||
throw new BadCredentialsException("用户凭证类型错误");
|
|
||||||
}
|
|
||||||
|
|
||||||
SysUserDetails userDetails = (SysUserDetails) principal;
|
|
||||||
String username = userDetails.getUsername();
|
|
||||||
|
|
||||||
if (StrUtil.isBlank(username)) {
|
|
||||||
log.warn("⚠ 用户名为空");
|
|
||||||
throw new BadCredentialsException("用户名为空");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 绑定用户身份到当前会话(重要:用于 @SendToUser 等注解)
|
|
||||||
accessor.setUser(authentication);
|
|
||||||
|
|
||||||
// 获取会话 ID
|
|
||||||
String sessionId = accessor.getSessionId();
|
|
||||||
if (sessionId == null) {
|
|
||||||
log.warn("⚠ 会话 ID 为空,使用临时 ID");
|
|
||||||
sessionId = "temp-" + System.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 记录用户上线状态
|
|
||||||
try {
|
|
||||||
webSocketService.userConnected(username, sessionId);
|
|
||||||
log.info("✓ WebSocket 连接建立成功: 用户[{}], 会话[{}]", username, sessionId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
log.error("❌ 记录用户上线状态失败: 用户[{}], 会话[{}]", username, sessionId, ex);
|
|
||||||
// 不抛出异常,允许连接继续
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理客户端断开连接事件
|
|
||||||
*
|
|
||||||
* 注意:
|
|
||||||
* - 只有成功建立过认证的连接才会触发下线事件
|
|
||||||
* - 防止未认证成功的连接产生脏数据
|
|
||||||
*/
|
|
||||||
private void handleDisconnect(StompHeaderAccessor accessor) {
|
|
||||||
Authentication authentication = (Authentication) accessor.getUser();
|
|
||||||
|
|
||||||
// 防御性检查:只处理已认证的连接
|
|
||||||
if (authentication == null || !authentication.isAuthenticated()) {
|
|
||||||
log.debug("未认证的连接断开,跳过处理");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Object principal = authentication.getPrincipal();
|
|
||||||
if (!(principal instanceof SysUserDetails)) {
|
|
||||||
log.warn("⚠ 断开连接时用户凭证类型异常");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SysUserDetails userDetails = (SysUserDetails) principal;
|
|
||||||
String username = userDetails.getUsername();
|
|
||||||
|
|
||||||
if (StrUtil.isNotBlank(username)) {
|
|
||||||
try {
|
|
||||||
webSocketService.userDisconnected(username);
|
|
||||||
log.info("✓ WebSocket 连接断开: 用户[{}]", username);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
log.error("❌ 记录用户下线状态失败: 用户[{}]", username, ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理客户端订阅事件(可选)
|
|
||||||
*
|
|
||||||
* 用于记录订阅信息或实施订阅级别的权限控制
|
|
||||||
*/
|
|
||||||
private void handleSubscribe(StompHeaderAccessor accessor) {
|
|
||||||
Authentication authentication = (Authentication) accessor.getUser();
|
|
||||||
|
|
||||||
if (authentication != null && authentication.isAuthenticated()) {
|
|
||||||
String destination = accessor.getDestination();
|
|
||||||
String username = authentication.getName();
|
|
||||||
|
|
||||||
log.debug("用户[{}]订阅主题: {}", username, destination);
|
|
||||||
|
|
||||||
// TODO: 这里可以实现订阅级别的权限控制
|
|
||||||
// 例如:检查用户是否有权限订阅某个主题
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -31,13 +31,13 @@ public class FileController {
|
|||||||
@Operation(summary = "文件上传")
|
@Operation(summary = "文件上传")
|
||||||
public Result<FileInfo> uploadFile(
|
public Result<FileInfo> uploadFile(
|
||||||
@Parameter(
|
@Parameter(
|
||||||
name = "file1",
|
name = "file",
|
||||||
description = "表单文件对象",
|
description = "表单文件对象",
|
||||||
required = true,
|
required = true,
|
||||||
in = ParameterIn.DEFAULT,
|
in = ParameterIn.DEFAULT,
|
||||||
schema = @Schema(name = "file1", format = "binary")
|
schema = @Schema(name = "file", format = "binary")
|
||||||
)
|
)
|
||||||
@RequestPart(value = "file1") MultipartFile file
|
@RequestPart(value = "file") MultipartFile file
|
||||||
) {
|
) {
|
||||||
FileInfo fileInfo = fileService.uploadFile(file);
|
FileInfo fileInfo = fileService.uploadFile(file);
|
||||||
return Result.success(fileInfo);
|
return Result.success(fileInfo);
|
||||||
|
|||||||
@@ -40,15 +40,10 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter {
|
|||||||
@Override
|
@Override
|
||||||
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
|
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
|
||||||
|
|
||||||
String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
|
String rawToken = resolveToken(request);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (StrUtil.isNotBlank(authorizationHeader)
|
if (StrUtil.isNotBlank(rawToken)) {
|
||||||
&& authorizationHeader.startsWith(SecurityConstants.BEARER_TOKEN_PREFIX)) {
|
|
||||||
|
|
||||||
// 剥离Bearer前缀获取原始令牌
|
|
||||||
String rawToken = authorizationHeader.substring(SecurityConstants.BEARER_TOKEN_PREFIX.length());
|
|
||||||
|
|
||||||
// 执行令牌有效性检查(包含密码学验签和过期时间验证)
|
// 执行令牌有效性检查(包含密码学验签和过期时间验证)
|
||||||
boolean isValidToken = tokenManager.validateToken(rawToken);
|
boolean isValidToken = tokenManager.validateToken(rawToken);
|
||||||
if (!isValidToken) {
|
if (!isValidToken) {
|
||||||
@@ -70,4 +65,25 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter {
|
|||||||
// 继续后续过滤器链执行
|
// 继续后续过滤器链执行
|
||||||
filterChain.doFilter(request, response);
|
filterChain.doFilter(request, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从请求中解析 Token
|
||||||
|
* 优先从 Authorization Header 获取,其次从 URL 参数获取(支持 SSE)
|
||||||
|
*/
|
||||||
|
private String resolveToken(HttpServletRequest request) {
|
||||||
|
// 1. 从 Authorization Header 获取
|
||||||
|
String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
|
||||||
|
if (StrUtil.isNotBlank(authorizationHeader)
|
||||||
|
&& authorizationHeader.startsWith(SecurityConstants.BEARER_TOKEN_PREFIX)) {
|
||||||
|
return authorizationHeader.substring(SecurityConstants.BEARER_TOKEN_PREFIX.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 从 URL 参数获取(支持 SSE EventSource)
|
||||||
|
String tokenParam = request.getParameter("token");
|
||||||
|
if (StrUtil.isNotBlank(tokenParam)) {
|
||||||
|
return tokenParam;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,13 +1,32 @@
|
|||||||
package com.youlai.boot.support.mail.service;
|
package com.youlai.boot.support.mail.service;
|
||||||
|
|
||||||
|
import com.youlai.boot.config.property.MailProperties;
|
||||||
|
import jakarta.mail.MessagingException;
|
||||||
|
import jakarta.mail.internet.MimeMessage;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.core.io.FileSystemResource;
|
||||||
|
import org.springframework.mail.SimpleMailMessage;
|
||||||
|
import org.springframework.mail.javamail.JavaMailSender;
|
||||||
|
import org.springframework.mail.javamail.MimeMessageHelper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 邮件服务接口层
|
* 邮件服务
|
||||||
*
|
*
|
||||||
* @author Ray
|
* @author Ray
|
||||||
* @since 2024/8/17
|
* @since 2024/8/17
|
||||||
*/
|
*/
|
||||||
public interface MailService {
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class MailService {
|
||||||
|
|
||||||
|
private final JavaMailSender mailSender;
|
||||||
|
|
||||||
|
private final MailProperties mailProperties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送简单文本邮件
|
* 发送简单文本邮件
|
||||||
@@ -16,16 +35,42 @@ public interface MailService {
|
|||||||
* @param subject 邮件主题
|
* @param subject 邮件主题
|
||||||
* @param text 邮件内容
|
* @param text 邮件内容
|
||||||
*/
|
*/
|
||||||
void sendMail(String to, String subject, String text) ;
|
public void sendMail(String to, String subject, String text) {
|
||||||
|
try {
|
||||||
|
SimpleMailMessage message = new SimpleMailMessage();
|
||||||
|
message.setFrom(mailProperties.getFrom());
|
||||||
|
message.setTo(to);
|
||||||
|
message.setSubject(subject);
|
||||||
|
message.setText(text);
|
||||||
|
mailSender.send(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("发送邮件失败{}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送带附件的邮件
|
* 发送带附件的邮件
|
||||||
*
|
*
|
||||||
* @param to 收件人地址
|
* @param to 收件人地址
|
||||||
* @param subject 邮件主题
|
* @param subject 邮件主题
|
||||||
* @param text 邮件内容
|
* @param text 邮件内容
|
||||||
* @param filePath 附件路径
|
* @param filePath 附件路径
|
||||||
*/
|
*/
|
||||||
void sendMailWithAttachment(String to, String subject, String text, String filePath);
|
public void sendMailWithAttachment(String to, String subject, String text, String filePath) {
|
||||||
|
MimeMessage message = mailSender.createMimeMessage();
|
||||||
|
try {
|
||||||
|
MimeMessageHelper helper = new MimeMessageHelper(message, true);
|
||||||
|
helper.setFrom(mailProperties.getFrom());
|
||||||
|
helper.setTo(to);
|
||||||
|
helper.setSubject(subject);
|
||||||
|
helper.setText(text, true); // true 表示支持HTML内容
|
||||||
|
|
||||||
|
FileSystemResource file = new FileSystemResource(new File(filePath));
|
||||||
|
helper.addAttachment(file.getFilename(), file);
|
||||||
|
|
||||||
|
mailSender.send(message);
|
||||||
|
} catch (MessagingException e) {
|
||||||
|
log.error("发送带附件的邮件失败{}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,79 +0,0 @@
|
|||||||
package com.youlai.boot.support.mail.service.impl;
|
|
||||||
|
|
||||||
import com.youlai.boot.config.property.MailProperties;
|
|
||||||
import com.youlai.boot.support.mail.service.MailService;
|
|
||||||
import jakarta.mail.MessagingException;
|
|
||||||
import jakarta.mail.internet.MimeMessage;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.core.io.FileSystemResource;
|
|
||||||
import org.springframework.mail.SimpleMailMessage;
|
|
||||||
import org.springframework.mail.javamail.JavaMailSender;
|
|
||||||
import org.springframework.mail.javamail.MimeMessageHelper;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 邮件服务实现类
|
|
||||||
*
|
|
||||||
* @author Ray
|
|
||||||
* @since 2024/8/17
|
|
||||||
*/
|
|
||||||
@Service
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
@Slf4j
|
|
||||||
public class MailServiceImpl implements MailService {
|
|
||||||
|
|
||||||
private final JavaMailSender mailSender;
|
|
||||||
|
|
||||||
private final MailProperties mailProperties;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送简单文本邮件
|
|
||||||
*
|
|
||||||
* @param to 收件人地址
|
|
||||||
* @param subject 邮件主题
|
|
||||||
* @param text 邮件内容
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void sendMail(String to, String subject, String text) {
|
|
||||||
try {
|
|
||||||
SimpleMailMessage message = new SimpleMailMessage();
|
|
||||||
message.setFrom(mailProperties.getFrom());
|
|
||||||
message.setTo(to);
|
|
||||||
message.setSubject(subject);
|
|
||||||
message.setText(text);
|
|
||||||
mailSender.send(message);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("发送邮件失败{}", e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送带附件的邮件
|
|
||||||
*
|
|
||||||
* @param to 收件人地址
|
|
||||||
* @param subject 邮件主题
|
|
||||||
* @param text 邮件内容
|
|
||||||
* @param filePath 附件路径
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void sendMailWithAttachment(String to, String subject, String text, String filePath) {
|
|
||||||
MimeMessage message = mailSender.createMimeMessage();
|
|
||||||
try {
|
|
||||||
MimeMessageHelper helper = new MimeMessageHelper(message, true);
|
|
||||||
helper.setFrom(mailProperties.getFrom());
|
|
||||||
helper.setTo(to);
|
|
||||||
helper.setSubject(subject);
|
|
||||||
helper.setText(text, true); // true 表示支持HTML内容
|
|
||||||
|
|
||||||
FileSystemResource file = new FileSystemResource(new File(filePath));
|
|
||||||
helper.addAttachment(file.getFilename(), file);
|
|
||||||
|
|
||||||
mailSender.send(message);
|
|
||||||
} catch (MessagingException e) {
|
|
||||||
log.error("发送带附件的邮件失败{}", e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package com.youlai.boot.support.sse;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 在线用户数统计定时任务
|
||||||
|
* <p>
|
||||||
|
* 定时统计并广播当前在线用户数量到所有 SSE 客户端
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class OnlineUserCountJob {
|
||||||
|
|
||||||
|
private final SseSessionRegistry sessionRegistry;
|
||||||
|
private final SseService sseService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 定时统计在线用户数并广播
|
||||||
|
* <p>
|
||||||
|
* 每3分钟执行一次,推送当前在线用户数量
|
||||||
|
*/
|
||||||
|
@Scheduled(cron = "0 */3 * * * ?")
|
||||||
|
public void execute() {
|
||||||
|
int onlineCount = sessionRegistry.getOnlineUserCount();
|
||||||
|
int connectionCount = sessionRegistry.getTotalConnectionCount();
|
||||||
|
|
||||||
|
log.debug("定时统计:在线用户数={}, 总连接数={}", onlineCount, connectionCount);
|
||||||
|
|
||||||
|
// 发送在线用户数量
|
||||||
|
sseService.sendOnlineCount();
|
||||||
|
}
|
||||||
|
}
|
||||||
44
src/main/java/com/youlai/boot/support/sse/SseController.java
Normal file
44
src/main/java/com/youlai/boot/support/sse/SseController.java
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package com.youlai.boot.support.sse;
|
||||||
|
|
||||||
|
import com.youlai.boot.core.web.Result;
|
||||||
|
import com.youlai.boot.security.model.SysUserDetails;
|
||||||
|
import com.youlai.boot.security.util.SecurityUtils;
|
||||||
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SSE 控制器
|
||||||
|
*/
|
||||||
|
@Tag(name = "14. SSE连接")
|
||||||
|
@Slf4j
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/v1/sse")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class SseController {
|
||||||
|
|
||||||
|
private final SseService sseService;
|
||||||
|
|
||||||
|
@Operation(summary = "建立SSE连接")
|
||||||
|
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||||
|
public SseEmitter connect() {
|
||||||
|
SysUserDetails user = SecurityUtils.getUser().orElse(null);
|
||||||
|
if (user == null) {
|
||||||
|
log.warn("SSE连接失败:未获取到当前用户");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return sseService.createConnection(user.getUsername());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(summary = "获取在线用户数")
|
||||||
|
@GetMapping("/online-count")
|
||||||
|
public Result<Integer> getOnlineCount() {
|
||||||
|
return Result.success(sseService.getOnlineUserCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
138
src/main/java/com/youlai/boot/support/sse/SseService.java
Normal file
138
src/main/java/com/youlai/boot/support/sse/SseService.java
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
package com.youlai.boot.support.sse;
|
||||||
|
|
||||||
|
import com.youlai.boot.support.sse.dto.DictChangeEvent;
|
||||||
|
import com.youlai.boot.support.sse.dto.OnlineUserDTO;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SSE 服务
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class SseService {
|
||||||
|
|
||||||
|
/** SSE 连接超时时间:30 分钟 */
|
||||||
|
private static final long TIMEOUT = 30 * 60 * 1000L;
|
||||||
|
|
||||||
|
private final SseSessionRegistry sessionRegistry;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建 SSE 连接
|
||||||
|
*
|
||||||
|
* @param username 用户名
|
||||||
|
* @return SseEmitter
|
||||||
|
*/
|
||||||
|
public SseEmitter createConnection(String username) {
|
||||||
|
if (username == null || username.isEmpty()) {
|
||||||
|
log.warn("创建SSE连接失败:用户名为空");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建 SseEmitter,设置超时时间
|
||||||
|
SseEmitter emitter = new SseEmitter(TIMEOUT);
|
||||||
|
|
||||||
|
// 注册用户连接
|
||||||
|
sessionRegistry.userConnected(username, emitter);
|
||||||
|
|
||||||
|
// 连接建立后立即发送在线用户数
|
||||||
|
try {
|
||||||
|
emitter.send(SseEmitter.event()
|
||||||
|
.name(SseTopics.ONLINE_COUNT)
|
||||||
|
.data(sessionRegistry.getOnlineUserCount()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.warn("发送初始在线用户数失败: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("用户[{}]SSE连接已建立,当前在线用户数: {}", username, sessionRegistry.getOnlineUserCount());
|
||||||
|
|
||||||
|
// 发送在线用户数变更
|
||||||
|
sendOnlineCount();
|
||||||
|
|
||||||
|
return emitter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送字典变更事件
|
||||||
|
*
|
||||||
|
* @param dictCode 字典编码
|
||||||
|
*/
|
||||||
|
public void sendDictChange(String dictCode) {
|
||||||
|
if (dictCode == null || dictCode.isEmpty()) {
|
||||||
|
log.warn("字典编码为空,跳过发送");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
DictChangeEvent event = new DictChangeEvent(dictCode);
|
||||||
|
sessionRegistry.broadcast(SseTopics.DICT, event);
|
||||||
|
log.info("已发送字典变更通知: dictCode={}", dictCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送在线用户数
|
||||||
|
*/
|
||||||
|
public void sendOnlineCount() {
|
||||||
|
int count = sessionRegistry.getOnlineUserCount();
|
||||||
|
sessionRegistry.broadcast(SseTopics.ONLINE_COUNT, count);
|
||||||
|
log.debug("已发送在线用户数: {}", count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送消息给指定用户
|
||||||
|
*
|
||||||
|
* @param username 用户名
|
||||||
|
* @param eventName 事件名称
|
||||||
|
* @param data 数据
|
||||||
|
*/
|
||||||
|
public void sendToUser(String username, String eventName, Object data) {
|
||||||
|
if (username == null || username.isEmpty()) {
|
||||||
|
log.warn("用户名为空,无法发送消息");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sessionRegistry.sendToUser(username, eventName, data);
|
||||||
|
log.info("已向用户[{}]发送事件[{}]", username, eventName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取在线用户列表
|
||||||
|
*
|
||||||
|
* @return 在线用户列表
|
||||||
|
*/
|
||||||
|
public List<OnlineUserDTO> getOnlineUsers() {
|
||||||
|
return sessionRegistry.getOnlineUsers();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取在线用户数
|
||||||
|
*
|
||||||
|
* @return 在线用户数
|
||||||
|
*/
|
||||||
|
public int getOnlineUserCount() {
|
||||||
|
return sessionRegistry.getOnlineUserCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送系统消息
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
*/
|
||||||
|
public void sendSystemMessage(String message) {
|
||||||
|
if (message == null || message.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Map<String, Object> systemMessage = Map.of(
|
||||||
|
"sender", "系统通知",
|
||||||
|
"content", message,
|
||||||
|
"timestamp", System.currentTimeMillis()
|
||||||
|
);
|
||||||
|
sessionRegistry.broadcast(SseTopics.SYSTEM, systemMessage);
|
||||||
|
log.info("已发送系统消息: {}", message);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,196 @@
|
|||||||
|
package com.youlai.boot.support.sse;
|
||||||
|
|
||||||
|
import com.youlai.boot.support.sse.dto.OnlineUserDTO;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SSE 会话注册表
|
||||||
|
* <p>
|
||||||
|
* 维护 SSE 连接的用户会话信息,支持多设备同时登录
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class SseSessionRegistry {
|
||||||
|
|
||||||
|
/** 用户名 -> SseEmitter 集合(支持多设备) */
|
||||||
|
private final Map<String, Set<SseEmitter>> userEmittersMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/** SseEmitter -> 用户名(快速定位用户) */
|
||||||
|
private final Map<SseEmitter, String> emitterUserMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/** SseEmitter -> 连接时间 */
|
||||||
|
private final Map<SseEmitter, Long> emitterTimeMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户上线(建立 SSE 连接)
|
||||||
|
*
|
||||||
|
* @param username 用户名
|
||||||
|
* @param emitter SseEmitter
|
||||||
|
*/
|
||||||
|
public void userConnected(String username, SseEmitter emitter) {
|
||||||
|
userEmittersMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet()).add(emitter);
|
||||||
|
emitterUserMap.put(emitter, username);
|
||||||
|
emitterTimeMap.put(emitter, System.currentTimeMillis());
|
||||||
|
log.debug("用户[{}]SSE连接已建立", username);
|
||||||
|
|
||||||
|
// 设置连接超时和完成回调
|
||||||
|
emitter.onCompletion(() -> {
|
||||||
|
removeEmitter(emitter);
|
||||||
|
log.debug("用户[{}]SSE连接已完成", username);
|
||||||
|
});
|
||||||
|
emitter.onTimeout(() -> {
|
||||||
|
removeEmitter(emitter);
|
||||||
|
log.debug("用户[{}]SSE连接超时", username);
|
||||||
|
});
|
||||||
|
emitter.onError(e -> {
|
||||||
|
removeEmitter(emitter);
|
||||||
|
log.debug("用户[{}]SSE连接错误: {}", username, e.getMessage());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 移除指定 emitter
|
||||||
|
*/
|
||||||
|
private void removeEmitter(SseEmitter emitter) {
|
||||||
|
String username = emitterUserMap.remove(emitter);
|
||||||
|
if (username == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
emitterTimeMap.remove(emitter);
|
||||||
|
|
||||||
|
Set<SseEmitter> emitters = userEmittersMap.get(username);
|
||||||
|
if (emitters != null) {
|
||||||
|
emitters.remove(emitter);
|
||||||
|
if (emitters.isEmpty()) {
|
||||||
|
userEmittersMap.remove(username);
|
||||||
|
log.debug("用户[{}]所有SSE连接已断开", username);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户下线(断开所有 SSE 连接)
|
||||||
|
*
|
||||||
|
* @param username 用户名
|
||||||
|
*/
|
||||||
|
public void userDisconnected(String username) {
|
||||||
|
Set<SseEmitter> emitters = userEmittersMap.remove(username);
|
||||||
|
if (emitters == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
emitters.forEach(emitter -> {
|
||||||
|
emitterUserMap.remove(emitter);
|
||||||
|
emitterTimeMap.remove(emitter);
|
||||||
|
try {
|
||||||
|
emitter.complete();
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
log.debug("用户[{}]已下线,移除{}个SSE连接", username, emitters.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取在线用户数量
|
||||||
|
*/
|
||||||
|
public int getOnlineUserCount() {
|
||||||
|
return userEmittersMap.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取总连接数量(包括多设备)
|
||||||
|
*/
|
||||||
|
public int getTotalConnectionCount() {
|
||||||
|
return emitterUserMap.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取指定用户的连接数量
|
||||||
|
*/
|
||||||
|
public int getUserConnectionCount(String username) {
|
||||||
|
Set<SseEmitter> emitters = userEmittersMap.get(username);
|
||||||
|
return emitters != null ? emitters.size() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检查用户是否在线
|
||||||
|
*/
|
||||||
|
public boolean isUserOnline(String username) {
|
||||||
|
Set<SseEmitter> emitters = userEmittersMap.get(username);
|
||||||
|
return emitters != null && !emitters.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有在线用户列表
|
||||||
|
*/
|
||||||
|
public List<OnlineUserDTO> getOnlineUsers() {
|
||||||
|
return userEmittersMap.entrySet().stream()
|
||||||
|
.map(entry -> {
|
||||||
|
String username = entry.getKey();
|
||||||
|
Set<SseEmitter> emitters = entry.getValue();
|
||||||
|
long earliestTime = emitters.stream()
|
||||||
|
.map(emitterTimeMap::get)
|
||||||
|
.filter(t -> t != null)
|
||||||
|
.mapToLong(Long::longValue)
|
||||||
|
.min()
|
||||||
|
.orElse(System.currentTimeMillis());
|
||||||
|
return new OnlineUserDTO(username, emitters.size(), earliestTime);
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有活跃的 SseEmitter
|
||||||
|
*/
|
||||||
|
public Set<SseEmitter> getAllEmitters() {
|
||||||
|
return emitterUserMap.keySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取指定用户的所有 SseEmitter
|
||||||
|
*/
|
||||||
|
public Set<SseEmitter> getUserEmitters(String username) {
|
||||||
|
return userEmittersMap.get(username);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向指定 emitter 发送事件
|
||||||
|
*/
|
||||||
|
public boolean sendEvent(SseEmitter emitter, String eventName, Object data) {
|
||||||
|
try {
|
||||||
|
emitter.send(SseEmitter.event()
|
||||||
|
.name(eventName)
|
||||||
|
.data(data));
|
||||||
|
return true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.warn("发送SSE事件失败: {}", e.getMessage());
|
||||||
|
removeEmitter(emitter);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向所有连接广播事件
|
||||||
|
*/
|
||||||
|
public void broadcast(String eventName, Object data) {
|
||||||
|
getAllEmitters().forEach(emitter -> sendEvent(emitter, eventName, data));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向指定用户发送事件
|
||||||
|
*/
|
||||||
|
public void sendToUser(String username, String eventName, Object data) {
|
||||||
|
Set<SseEmitter> emitters = userEmittersMap.get(username);
|
||||||
|
if (emitters != null) {
|
||||||
|
emitters.forEach(emitter -> sendEvent(emitter, eventName, data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
19
src/main/java/com/youlai/boot/support/sse/SseTopics.java
Normal file
19
src/main/java/com/youlai/boot/support/sse/SseTopics.java
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package com.youlai.boot.support.sse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SSE 主题常量
|
||||||
|
*/
|
||||||
|
public final class SseTopics {
|
||||||
|
|
||||||
|
private SseTopics() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/** 字典变更事件 */
|
||||||
|
public static final String DICT = "dict";
|
||||||
|
|
||||||
|
/** 在线用户数事件 */
|
||||||
|
public static final String ONLINE_COUNT = "online-count";
|
||||||
|
|
||||||
|
/** 系统消息事件 */
|
||||||
|
public static final String SYSTEM = "system";
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.youlai.boot.support.websocket.dto;
|
package com.youlai.boot.support.sse.dto;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
@@ -10,7 +10,7 @@ import java.io.Serializable;
|
|||||||
/**
|
/**
|
||||||
* 字典变更事件
|
* 字典变更事件
|
||||||
* <p>
|
* <p>
|
||||||
* 当字典数据发生变更时,通过 WebSocket 广播此事件通知前端清除缓存。
|
* 当字典数据发生变更时,通过 SSE 广播此事件通知前端清除缓存。
|
||||||
* 前端收到通知后清除对应字典的本地缓存,下次使用时重新从服务端加载。
|
* 前端收到通知后清除对应字典的本地缓存,下次使用时重新从服务端加载。
|
||||||
*
|
*
|
||||||
* @author Ray.Hao
|
* @author Ray.Hao
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.youlai.boot.support.websocket.dto;
|
package com.youlai.boot.support.sse.dto;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
@@ -1,47 +0,0 @@
|
|||||||
package com.youlai.boot.support.websocket.job;
|
|
||||||
|
|
||||||
import com.youlai.boot.support.websocket.publisher.WebSocketPublisher;
|
|
||||||
import com.youlai.boot.support.websocket.session.UserSessionRegistry;
|
|
||||||
import com.youlai.boot.support.websocket.topic.WebSocketTopics;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 在线用户数统计定时任务
|
|
||||||
* <p>
|
|
||||||
* 定时统计并广播当前在线用户数量到所有WebSocket客户端。
|
|
||||||
* 用于解决以下问题:
|
|
||||||
* <ul>
|
|
||||||
* <li>客户端页面刷新后可快速同步最新在线人数</li>
|
|
||||||
* <li>减少服务端主动推送频率,降低资源消耗</li>
|
|
||||||
* </ul>
|
|
||||||
*
|
|
||||||
* @author Ray.Hao
|
|
||||||
* @since 3.0.0
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class OnlineUserCountJob {
|
|
||||||
|
|
||||||
private final UserSessionRegistry userSessionRegistry;
|
|
||||||
private final WebSocketPublisher webSocketPublisher;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 定时统计在线用户数并广播
|
|
||||||
* <p>
|
|
||||||
* 每3分钟执行一次,推送当前在线用户数量
|
|
||||||
*/
|
|
||||||
@Scheduled(cron = "0 */3 * * * ?")
|
|
||||||
public void execute() {
|
|
||||||
int onlineCount = userSessionRegistry.getOnlineUserCount();
|
|
||||||
int sessionCount = userSessionRegistry.getTotalSessionCount();
|
|
||||||
|
|
||||||
log.debug("定时统计:在线用户数={}, 总会话数={}", onlineCount, sessionCount);
|
|
||||||
|
|
||||||
// 广播在线用户数量
|
|
||||||
webSocketPublisher.publish(WebSocketTopics.TOPIC_ONLINE_COUNT, onlineCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
package com.youlai.boot.support.websocket.publisher;
|
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
import tools.jackson.core.JacksonException;
|
|
||||||
import tools.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
@Slf4j
|
|
||||||
public class WebSocketPublisher {
|
|
||||||
|
|
||||||
private SimpMessagingTemplate messagingTemplate;
|
|
||||||
private final ObjectMapper objectMapper;
|
|
||||||
|
|
||||||
@Autowired(required = false)
|
|
||||||
public void setMessagingTemplate(SimpMessagingTemplate messagingTemplate) {
|
|
||||||
this.messagingTemplate = messagingTemplate;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void publish(String destination, Object payload) {
|
|
||||||
if (messagingTemplate == null) {
|
|
||||||
log.warn("消息模板尚未初始化,无法发送消息: destination={}", destination);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Object body = serializeIfNeeded(payload);
|
|
||||||
messagingTemplate.convertAndSend(destination, body);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("发送消息失败: destination={}", destination, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void publishToUser(String username, String destination, Object payload) {
|
|
||||||
if (messagingTemplate == null) {
|
|
||||||
log.warn("消息模板尚未初始化,无法发送用户消息: username={}, destination={}", username, destination);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Object body = serializeIfNeeded(payload);
|
|
||||||
messagingTemplate.convertAndSendToUser(username, destination, body);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("发送用户消息失败: username={}, destination={}", username, destination, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Object serializeIfNeeded(Object payload) throws JacksonException {
|
|
||||||
if (payload == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (payload instanceof String || payload instanceof Number || payload instanceof Boolean) {
|
|
||||||
return payload;
|
|
||||||
}
|
|
||||||
return objectMapper.writeValueAsString(payload);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,57 +0,0 @@
|
|||||||
package com.youlai.boot.support.websocket.service;
|
|
||||||
|
|
||||||
import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* WebSocket服务接口
|
|
||||||
* <p>
|
|
||||||
* 提供与WebSocket连接管理相关的功能,包括:
|
|
||||||
* - 用户连接/断开事件处理
|
|
||||||
* - 字典数据变更通知
|
|
||||||
* - 系统消息推送
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @author Ray.Hao
|
|
||||||
* @since 3.0.0
|
|
||||||
*/
|
|
||||||
public interface WebSocketService {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理用户连接事件
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
* @param sessionId WebSocket会话ID
|
|
||||||
*/
|
|
||||||
void userConnected(String username, String sessionId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理用户断开连接事件
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
*/
|
|
||||||
void userDisconnected(String username);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 广播字典数据变更通知
|
|
||||||
*
|
|
||||||
* @param dictCode 字典编码
|
|
||||||
*/
|
|
||||||
void broadcastDictChange(String dictCode);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送系统通知给特定用户
|
|
||||||
*
|
|
||||||
* @param username 目标用户名
|
|
||||||
* @param message 通知消息内容
|
|
||||||
*/
|
|
||||||
void sendNotification(String username, Object message);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取在线用户列表
|
|
||||||
*
|
|
||||||
* @return 在线用户信息列表
|
|
||||||
*/
|
|
||||||
List<OnlineUserDTO> getOnlineUsers();
|
|
||||||
}
|
|
||||||
@@ -1,226 +0,0 @@
|
|||||||
package com.youlai.boot.support.websocket.service.impl;
|
|
||||||
|
|
||||||
import com.youlai.boot.support.websocket.dto.DictChangeEvent;
|
|
||||||
import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
|
|
||||||
import com.youlai.boot.support.websocket.publisher.WebSocketPublisher;
|
|
||||||
import com.youlai.boot.support.websocket.session.UserSessionRegistry;
|
|
||||||
import com.youlai.boot.support.websocket.service.WebSocketService;
|
|
||||||
import com.youlai.boot.support.websocket.topic.WebSocketTopics;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* WebSocket 服务实现类
|
|
||||||
*
|
|
||||||
* 核心功能:
|
|
||||||
* - 用户在线状态管理(支持多设备登录)
|
|
||||||
* - 消息推送(广播、点对点)
|
|
||||||
* - 字典变更通知
|
|
||||||
*
|
|
||||||
* @author Ray.Hao
|
|
||||||
* @since 3.0.0
|
|
||||||
*/
|
|
||||||
@Service
|
|
||||||
@Slf4j
|
|
||||||
public class WebSocketServiceImpl implements WebSocketService {
|
|
||||||
|
|
||||||
private final UserSessionRegistry userSessionRegistry;
|
|
||||||
private final WebSocketPublisher webSocketPublisher;
|
|
||||||
|
|
||||||
public WebSocketServiceImpl(UserSessionRegistry userSessionRegistry, WebSocketPublisher webSocketPublisher) {
|
|
||||||
this.userSessionRegistry = userSessionRegistry;
|
|
||||||
this.webSocketPublisher = webSocketPublisher;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==================== 用户在线状态管理 ====================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理用户连接事件
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
* @param sessionId WebSocket 会话 ID
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void userConnected(String username, String sessionId) {
|
|
||||||
if (username == null || username.isEmpty()) {
|
|
||||||
log.warn("用户连接失败:用户名为空");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sessionId == null || sessionId.isEmpty()) {
|
|
||||||
log.warn("用户[{}]连接失败:会话 ID 为空", username);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
userSessionRegistry.userConnected(username, sessionId);
|
|
||||||
|
|
||||||
int sessionCount = userSessionRegistry.getUserSessionCount(username);
|
|
||||||
int totalOnlineUsers = userSessionRegistry.getOnlineUserCount();
|
|
||||||
|
|
||||||
log.info("✓ 用户[{}]会话[{}]上线(该用户共 {} 个会话,系统总在线用户数:{})",
|
|
||||||
username, sessionId, sessionCount, totalOnlineUsers);
|
|
||||||
|
|
||||||
// 广播在线用户数变更
|
|
||||||
broadcastOnlineUserCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理用户断开连接事件
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void userDisconnected(String username) {
|
|
||||||
if (username == null || username.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
userSessionRegistry.userDisconnected(username);
|
|
||||||
|
|
||||||
int totalOnlineUsers = userSessionRegistry.getOnlineUserCount();
|
|
||||||
log.info("✓ 用户[{}]下线(系统总在线用户数:{})", username, totalOnlineUsers);
|
|
||||||
|
|
||||||
// 广播在线用户数变更
|
|
||||||
broadcastOnlineUserCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 移除指定会话(单个设备下线)
|
|
||||||
*
|
|
||||||
* @param sessionId 会话 ID
|
|
||||||
*/
|
|
||||||
public void removeSession(String sessionId) {
|
|
||||||
userSessionRegistry.removeSession(sessionId);
|
|
||||||
broadcastOnlineUserCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取在线用户列表
|
|
||||||
*
|
|
||||||
* @return 在线用户信息列表
|
|
||||||
*/
|
|
||||||
public List<OnlineUserDTO> getOnlineUsers() {
|
|
||||||
return userSessionRegistry.getOnlineUsers();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取在线用户数量
|
|
||||||
*
|
|
||||||
* @return 在线用户数(不是会话数)
|
|
||||||
*/
|
|
||||||
public int getOnlineUserCount() {
|
|
||||||
return userSessionRegistry.getOnlineUserCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取在线会话总数
|
|
||||||
*
|
|
||||||
* @return 所有在线会话的总数
|
|
||||||
*/
|
|
||||||
public int getTotalSessionCount() {
|
|
||||||
return userSessionRegistry.getTotalSessionCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 检查用户是否在线
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
* @return 是否在线
|
|
||||||
*/
|
|
||||||
public boolean isUserOnline(String username) {
|
|
||||||
return userSessionRegistry.isUserOnline(username);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取指定用户的会话数量
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
* @return 会话数量
|
|
||||||
*/
|
|
||||||
public int getUserSessionCount(String username) {
|
|
||||||
return userSessionRegistry.getUserSessionCount(username);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 手动触发在线用户数量广播
|
|
||||||
*
|
|
||||||
* 供外部服务(如定时任务)调用
|
|
||||||
*/
|
|
||||||
public void notifyOnlineUsersChange() {
|
|
||||||
log.info("手动触发在线用户数量通知,当前在线用户数:{}", getOnlineUserCount());
|
|
||||||
broadcastOnlineUserCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 广播在线用户数量变更(内部方法)
|
|
||||||
*/
|
|
||||||
private void broadcastOnlineUserCount() {
|
|
||||||
int count = getOnlineUserCount();
|
|
||||||
webSocketPublisher.publish(WebSocketTopics.TOPIC_ONLINE_COUNT, count);
|
|
||||||
log.debug("✓ 已广播在线用户数量: {}", count);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==================== 消息推送功能 ====================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 向所有客户端广播字典更新事件
|
|
||||||
*
|
|
||||||
* @param dictCode 字典编码
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void broadcastDictChange(String dictCode) {
|
|
||||||
if (dictCode == null || dictCode.isEmpty()) {
|
|
||||||
log.warn("字典编码为空,跳过广播");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
DictChangeEvent event = new DictChangeEvent(dictCode);
|
|
||||||
webSocketPublisher.publish(WebSocketTopics.TOPIC_DICT, event);
|
|
||||||
log.info("✓ 已广播字典变更通知: dictCode={}", dictCode);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 向特定用户发送通知消息
|
|
||||||
*
|
|
||||||
* @param username 目标用户名
|
|
||||||
* @param message 消息内容
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void sendNotification(String username, Object message) {
|
|
||||||
if (username == null || username.isEmpty()) {
|
|
||||||
log.warn("用户名为空,无法发送通知");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (message == null) {
|
|
||||||
log.warn("消息内容为空,无法发送给用户[{}]", username);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
webSocketPublisher.publishToUser(username, WebSocketTopics.USER_QUEUE_MESSAGES, message);
|
|
||||||
log.info("✓ 已向用户[{}]发送通知", username);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 广播系统消息给所有用户
|
|
||||||
*
|
|
||||||
* @param message 消息内容
|
|
||||||
*/
|
|
||||||
public void broadcastSystemMessage(String message) {
|
|
||||||
if (message == null || message.isEmpty()) {
|
|
||||||
log.warn("消息内容为空,无法广播");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, Object> systemMessage = Map.of(
|
|
||||||
"sender", "系统通知",
|
|
||||||
"content", message,
|
|
||||||
"timestamp", System.currentTimeMillis()
|
|
||||||
);
|
|
||||||
webSocketPublisher.publish(WebSocketTopics.TOPIC_PUBLIC, systemMessage);
|
|
||||||
log.info("✓ 已广播系统消息: {}", message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,179 +0,0 @@
|
|||||||
package com.youlai.boot.support.websocket.session;
|
|
||||||
|
|
||||||
import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* WebSocket 用户会话注册表
|
|
||||||
* <p>
|
|
||||||
* 维护WebSocket连接的用户会话信息,支持多设备同时登录。
|
|
||||||
* 采用双Map结构实现高效查询:
|
|
||||||
* <ul>
|
|
||||||
* <li>userSessionsMap: 用户名 -> 会话ID集合(支持多设备)</li>
|
|
||||||
* <li>sessionDetailsMap: 会话ID -> 会话详情(快速定位用户)</li>
|
|
||||||
* </ul>
|
|
||||||
*
|
|
||||||
* @author Ray.Hao
|
|
||||||
* @since 3.0.0
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
public class UserSessionRegistry {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 用户会话映射表
|
|
||||||
* <p>
|
|
||||||
* Key: 用户名
|
|
||||||
* Value: 该用户所有WebSocket会话ID集合(支持多设备登录)
|
|
||||||
*/
|
|
||||||
private final Map<String, Set<String>> userSessionsMap = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 会话详情映射表
|
|
||||||
* <p>
|
|
||||||
* Key: WebSocket会话ID
|
|
||||||
* Value: 会话详情(包含用户名、连接时间等)
|
|
||||||
*/
|
|
||||||
private final Map<String, SessionInfo> sessionDetailsMap = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 用户上线(建立WebSocket连接)
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
* @param sessionId WebSocket会话ID
|
|
||||||
*/
|
|
||||||
public void userConnected(String username, String sessionId) {
|
|
||||||
userSessionsMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet()).add(sessionId);
|
|
||||||
sessionDetailsMap.put(sessionId, new SessionInfo(username, sessionId, System.currentTimeMillis()));
|
|
||||||
log.debug("用户[{}]会话[{}]已注册", username, sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 用户下线(断开所有WebSocket连接)
|
|
||||||
* <p>
|
|
||||||
* 移除该用户的所有会话信息
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
*/
|
|
||||||
public void userDisconnected(String username) {
|
|
||||||
Set<String> sessions = userSessionsMap.remove(username);
|
|
||||||
if (sessions == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sessions.forEach(sessionDetailsMap::remove);
|
|
||||||
log.debug("用户[{}]已下线,移除{}个会话", username, sessions.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 移除指定会话(单设备下线)
|
|
||||||
* <p>
|
|
||||||
* 当用户某一设备断开连接时调用,保留其他设备的会话
|
|
||||||
*
|
|
||||||
* @param sessionId WebSocket会话ID
|
|
||||||
*/
|
|
||||||
public void removeSession(String sessionId) {
|
|
||||||
SessionInfo sessionInfo = sessionDetailsMap.remove(sessionId);
|
|
||||||
if (sessionInfo == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String username = sessionInfo.getUsername();
|
|
||||||
Set<String> sessions = userSessionsMap.get(username);
|
|
||||||
if (sessions == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
sessions.remove(sessionId);
|
|
||||||
if (sessions.isEmpty()) {
|
|
||||||
// 该用户没有任何会话了,移除用户记录
|
|
||||||
userSessionsMap.remove(username);
|
|
||||||
log.debug("用户[{}]最后一个会话已移除", username);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取在线用户数量
|
|
||||||
*
|
|
||||||
* @return 当前在线用户数(非会话数)
|
|
||||||
*/
|
|
||||||
public int getOnlineUserCount() {
|
|
||||||
return userSessionsMap.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取指定用户的会话数量
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
* @return 该用户的WebSocket会话数量(多设备登录时大于1)
|
|
||||||
*/
|
|
||||||
public int getUserSessionCount(String username) {
|
|
||||||
Set<String> sessions = userSessionsMap.get(username);
|
|
||||||
return sessions != null ? sessions.size() : 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取在线会话总数
|
|
||||||
*
|
|
||||||
* @return 所有WebSocket会话的总数(包含多设备)
|
|
||||||
*/
|
|
||||||
public int getTotalSessionCount() {
|
|
||||||
return sessionDetailsMap.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 检查用户是否在线
|
|
||||||
*
|
|
||||||
* @param username 用户名
|
|
||||||
* @return 是否在线(至少有一个活跃会话)
|
|
||||||
*/
|
|
||||||
public boolean isUserOnline(String username) {
|
|
||||||
Set<String> sessions = userSessionsMap.get(username);
|
|
||||||
return sessions != null && !sessions.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取所有在线用户列表
|
|
||||||
*
|
|
||||||
* @return 在线用户信息列表
|
|
||||||
*/
|
|
||||||
public List<OnlineUserDTO> getOnlineUsers() {
|
|
||||||
return userSessionsMap.entrySet().stream()
|
|
||||||
.map(entry -> {
|
|
||||||
String username = entry.getKey();
|
|
||||||
Set<String> sessions = entry.getValue();
|
|
||||||
// 取最早的连接时间作为登录时间
|
|
||||||
long earliestLoginTime = sessions.stream()
|
|
||||||
.map(sessionDetailsMap::get)
|
|
||||||
.filter(info -> info != null)
|
|
||||||
.mapToLong(SessionInfo::getConnectTime)
|
|
||||||
.min()
|
|
||||||
.orElse(System.currentTimeMillis());
|
|
||||||
|
|
||||||
return new OnlineUserDTO(username, sessions.size(), earliestLoginTime);
|
|
||||||
})
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* WebSocket 会话详情(内部使用)
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@AllArgsConstructor
|
|
||||||
private static class SessionInfo {
|
|
||||||
/** 用户名 */
|
|
||||||
private String username;
|
|
||||||
/** WebSocket会话ID */
|
|
||||||
private String sessionId;
|
|
||||||
/** 连接时间戳 */
|
|
||||||
private long connectTime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
package com.youlai.boot.support.websocket.topic;
|
|
||||||
|
|
||||||
public final class WebSocketTopics {
|
|
||||||
|
|
||||||
private WebSocketTopics() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final String TOPIC_DICT = "/topic/dict";
|
|
||||||
public static final String TOPIC_ONLINE_COUNT = "/topic/online-count";
|
|
||||||
public static final String TOPIC_PUBLIC = "/topic/public";
|
|
||||||
|
|
||||||
public static final String USER_QUEUE_MESSAGES = "/queue/messages";
|
|
||||||
public static final String USER_QUEUE_MESSAGE = "/queue/message";
|
|
||||||
}
|
|
||||||
@@ -16,7 +16,7 @@ import com.youlai.boot.system.model.form.DictForm;
|
|||||||
import com.youlai.boot.common.annotation.Log;
|
import com.youlai.boot.common.annotation.Log;
|
||||||
import com.youlai.boot.system.service.DictItemService;
|
import com.youlai.boot.system.service.DictItemService;
|
||||||
import com.youlai.boot.system.service.DictService;
|
import com.youlai.boot.system.service.DictService;
|
||||||
import com.youlai.boot.support.websocket.service.WebSocketService;
|
import com.youlai.boot.support.sse.SseService;
|
||||||
import io.swagger.v3.oas.annotations.Parameter;
|
import io.swagger.v3.oas.annotations.Parameter;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
@@ -42,7 +42,7 @@ public class DictController {
|
|||||||
|
|
||||||
private final DictService dictService;
|
private final DictService dictService;
|
||||||
private final DictItemService dictItemService;
|
private final DictItemService dictItemService;
|
||||||
private final WebSocketService webSocketService;
|
private final SseService sseService;
|
||||||
|
|
||||||
//---------------------------------------------------
|
//---------------------------------------------------
|
||||||
// 字典相关接口
|
// 字典相关接口
|
||||||
@@ -82,7 +82,7 @@ public class DictController {
|
|||||||
boolean result = dictService.saveDict(formData);
|
boolean result = dictService.saveDict(formData);
|
||||||
// 发送字典更新通知
|
// 发送字典更新通知
|
||||||
if (result) {
|
if (result) {
|
||||||
webSocketService.broadcastDictChange(formData.getDictCode());
|
sseService.sendDictChange(formData.getDictCode());
|
||||||
}
|
}
|
||||||
return Result.judge(result);
|
return Result.judge(result);
|
||||||
}
|
}
|
||||||
@@ -97,7 +97,7 @@ public class DictController {
|
|||||||
boolean status = dictService.updateDict(id, dictForm);
|
boolean status = dictService.updateDict(id, dictForm);
|
||||||
// 发送字典更新通知
|
// 发送字典更新通知
|
||||||
if (status && dictForm.getDictCode() != null) {
|
if (status && dictForm.getDictCode() != null) {
|
||||||
webSocketService.broadcastDictChange(dictForm.getDictCode());
|
sseService.sendDictChange(dictForm.getDictCode());
|
||||||
}
|
}
|
||||||
return Result.judge(status);
|
return Result.judge(status);
|
||||||
}
|
}
|
||||||
@@ -110,14 +110,14 @@ public class DictController {
|
|||||||
) {
|
) {
|
||||||
// 获取字典编码列表,用于发送删除通知
|
// 获取字典编码列表,用于发送删除通知
|
||||||
List<String> dictCodes = dictService.getDictCodesByIds(Arrays.stream(ids.split(",")).toList());
|
List<String> dictCodes = dictService.getDictCodesByIds(Arrays.stream(ids.split(",")).toList());
|
||||||
|
|
||||||
dictService.deleteDictByIds(Arrays.stream(ids.split(",")).toList());
|
dictService.deleteDictByIds(Arrays.stream(ids.split(",")).toList());
|
||||||
|
|
||||||
// 发送字典删除通知
|
// 发送字典删除通知
|
||||||
for (String dictCode : dictCodes) {
|
for (String dictCode : dictCodes) {
|
||||||
webSocketService.broadcastDictChange(dictCode);
|
sseService.sendDictChange(dictCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Result.success();
|
return Result.success();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,12 +155,12 @@ public class DictController {
|
|||||||
) {
|
) {
|
||||||
formData.setDictCode(dictCode);
|
formData.setDictCode(dictCode);
|
||||||
boolean result = dictItemService.saveDictItem(formData);
|
boolean result = dictItemService.saveDictItem(formData);
|
||||||
|
|
||||||
// 发送字典更新通知
|
// 发送字典更新通知
|
||||||
if (result) {
|
if (result) {
|
||||||
webSocketService.broadcastDictChange(dictCode);
|
sseService.sendDictChange(dictCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Result.judge(result);
|
return Result.judge(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -186,12 +186,12 @@ public class DictController {
|
|||||||
formData.setId(itemId);
|
formData.setId(itemId);
|
||||||
formData.setDictCode(dictCode);
|
formData.setDictCode(dictCode);
|
||||||
boolean status = dictItemService.updateDictItem(formData);
|
boolean status = dictItemService.updateDictItem(formData);
|
||||||
|
|
||||||
// 发送字典更新通知
|
// 发送字典更新通知
|
||||||
if (status) {
|
if (status) {
|
||||||
webSocketService.broadcastDictChange(dictCode);
|
sseService.sendDictChange(dictCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Result.judge(status);
|
return Result.judge(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -203,10 +203,10 @@ public class DictController {
|
|||||||
@Parameter(description = "字典ID,多个以英文逗号(,)拼接") @PathVariable String itemIds
|
@Parameter(description = "字典ID,多个以英文逗号(,)拼接") @PathVariable String itemIds
|
||||||
) {
|
) {
|
||||||
dictItemService.deleteDictItemByIds(itemIds);
|
dictItemService.deleteDictItemByIds(itemIds);
|
||||||
|
|
||||||
// 发送字典更新通知
|
// 发送字典更新通知
|
||||||
webSocketService.broadcastDictChange(dictCode);
|
sseService.sendDictChange(dictCode);
|
||||||
|
|
||||||
return Result.success();
|
return Result.success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
|||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
import com.youlai.boot.core.exception.BusinessException;
|
import com.youlai.boot.core.exception.BusinessException;
|
||||||
import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
|
import com.youlai.boot.support.sse.dto.OnlineUserDTO;
|
||||||
import com.youlai.boot.security.util.SecurityUtils;
|
import com.youlai.boot.security.util.SecurityUtils;
|
||||||
import com.youlai.boot.system.converter.NoticeConverter;
|
import com.youlai.boot.system.converter.NoticeConverter;
|
||||||
import com.youlai.boot.system.enums.NoticePublishStatusEnum;
|
import com.youlai.boot.system.enums.NoticePublishStatusEnum;
|
||||||
@@ -27,7 +27,7 @@ import com.youlai.boot.system.model.vo.NoticeDetailVO;
|
|||||||
import com.youlai.boot.system.service.NoticeService;
|
import com.youlai.boot.system.service.NoticeService;
|
||||||
import com.youlai.boot.system.service.UserNoticeService;
|
import com.youlai.boot.system.service.UserNoticeService;
|
||||||
import com.youlai.boot.system.service.UserService;
|
import com.youlai.boot.system.service.UserService;
|
||||||
import com.youlai.boot.support.websocket.service.WebSocketService;
|
import com.youlai.boot.support.sse.SseService;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@@ -52,7 +52,7 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
|
|||||||
private final NoticeConverter noticeConverter;
|
private final NoticeConverter noticeConverter;
|
||||||
private final UserNoticeService userNoticeService;
|
private final UserNoticeService userNoticeService;
|
||||||
private final UserService userService;
|
private final UserService userService;
|
||||||
private final WebSocketService webSocketService;
|
private final SseService sseService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取通知公告分页列表
|
* 获取通知公告分页列表
|
||||||
@@ -213,7 +213,7 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
|
|||||||
Set<String> receivers = targetUserList.stream().map(User::getUsername).collect(Collectors.toSet());
|
Set<String> receivers = targetUserList.stream().map(User::getUsername).collect(Collectors.toSet());
|
||||||
|
|
||||||
// 获取在线用户名集合
|
// 获取在线用户名集合
|
||||||
Set<String> allOnlineUsers = webSocketService.getOnlineUsers().stream()
|
Set<String> allOnlineUsers = sseService.getOnlineUsers().stream()
|
||||||
.map(OnlineUserDTO::getUsername)
|
.map(OnlineUserDTO::getUsername)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
@@ -227,7 +227,7 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
|
|||||||
noticeDto.setPublishTime(notice.getPublishTime());
|
noticeDto.setPublishTime(notice.getPublishTime());
|
||||||
|
|
||||||
// 向在线接收者推送通知
|
// 向在线接收者推送通知
|
||||||
onlineReceivers.forEach(receiver -> webSocketService.sendNotification(receiver, noticeDto));
|
onlineReceivers.forEach(receiver -> sseService.sendToUser(receiver, "notice", noticeDto));
|
||||||
}
|
}
|
||||||
return publishResult;
|
return publishResult;
|
||||||
}
|
}
|
||||||
@@ -261,6 +261,19 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
|
|||||||
userNoticeService.remove(new LambdaQueryWrapper<UserNotice>()
|
userNoticeService.remove(new LambdaQueryWrapper<UserNotice>()
|
||||||
.eq(UserNotice::getNoticeId, id)
|
.eq(UserNotice::getNoticeId, id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// 通知前端移除该通知
|
||||||
|
NoticeDTO noticeDto = new NoticeDTO();
|
||||||
|
noticeDto.setId(id);
|
||||||
|
|
||||||
|
// 获取所有在线用户
|
||||||
|
Set<String> allOnlineUsers = sseService.getOnlineUsers().stream()
|
||||||
|
.map(OnlineUserDTO::getUsername)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
// 向所有在线用户推送撤回通知
|
||||||
|
allOnlineUsers.forEach(username ->
|
||||||
|
sseService.sendToUser(username, "notice-revoke", noticeDto));
|
||||||
}
|
}
|
||||||
return revokeResult;
|
return revokeResult;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -219,13 +219,3 @@ wx:
|
|||||||
miniapp:
|
miniapp:
|
||||||
appid: Your_AppId
|
appid: Your_AppId
|
||||||
secret: Your_AppSecret
|
secret: Your_AppSecret
|
||||||
|
|
||||||
# Actuator 端点配置(监控端点暴露)
|
|
||||||
management:
|
|
||||||
endpoints:
|
|
||||||
web:
|
|
||||||
exposure:
|
|
||||||
include: health,info,metrics,env,loggers,heapdump,threaddump
|
|
||||||
endpoint:
|
|
||||||
health:
|
|
||||||
show-details: always
|
|
||||||
|
|||||||
Reference in New Issue
Block a user