diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index 85c30ab5..00000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,60 +0,0 @@ -# 2.7.1 (2024/4/18) -### 🐛 fix -- 修复用户名或者密码错误时,返回的错误信息不正确问题 -### 🛠️ refactor -- JWT 解析和验证代码优化重构 -- 优化代码结构和完善注释,提高代码可读性 - -# 2.7.0 (2024/4/13) -### ✨ feat -- 集成 Mybatis-Plus generator 代码生成器 - -# 2.6.0 (2024/3/6) - -### ✨ feat -- 黑名单方式实现 JWT 主动注销过期 -### 🛠️ refactor -- 角色权限重构 - - -# 2.5.0 (2023/12/6) -### ✨ feat -- [集成 Spring Cache 和 Redis 缓存,路由缓存](https://blog.csdn.net/u013737132/article/details/134789862) -### 🛠️ refactor -- 权限判断逻辑调整,用户绑定权限调整为角色绑定权限 -### fix -- [接口无请求权限,Spring Security 自定义异常无效问题修复](https://youlai.blog.csdn.net/article/details/134718249) - - -# 2.4.1 (2023/11/7) -### ✂️ refactor -- 项目目录结构优化 -### ⬆️ chore -- 升级 SpringBoot 版本 `3.1.4` → `3.1.5` - - -# 2.2.1 (2023/5/25) - -### 🐛 fix - -- 修复多级路由的组件路径错误导致页面404问题 - -# 2.2.0 (2023/5/21) - -### ✨ feat -- 菜单、角色、字典、部门添加接口权限控制 - -### 🐛 fix - -- 用户登录权限缓存键值不一致导致获取用户数据权限错误问题修复 - -### ✂️ refactor - -- 递归获取菜单、部门属性列表代码重构优化 - -### ⬆️ chore -- 升级 SpringBoot 版本 `3.0.6` → `3.1.0` - -### 📝 docs -- SQL 脚本更新,sys_menu 新增 `tree_path` 字段 (升级需更新SQL脚本) - diff --git a/README.md b/README.md index 87dad56e..45db9948 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ | [vue3-element-template](https://gitee.com/youlaiorg/vue3-element-template) | Vue 3 + Element Plus | 前端精简模板 | | [youlai-boot-tenant](https://gitee.com/youlaiorg/youlai-boot-tenant) | Spring Boot 4 | 多租户 SaaS 版 | | [youlai-boot-flex](https://gitee.com/youlaiorg/youlai-boot-flex) | Spring Boot 3 + MyBatis-Flex | MyBatis-Flex 版 | -| [youlai-uniapp](https://gitee.com/youlaiorg/youlai-uniapp) | Vue 3 + uni-app | 移动端应用 | +| [youlai-app](https://gitee.com/youlaiorg/youlai-app) | Vue 3 + uni-app | 移动端应用 | --- @@ -167,17 +167,4 @@ docker-compose up -d -**微信交流**:添加 **`haoxianrui`**,备注「前端/后端/全栈」 - ---- - -如果项目对你有帮助,欢迎 ⭐️ Star 支持! - -
- ⭐ Gitee  •  - ⭐ GitHub  •  - ⭐ AtomGit -
- 🌐 官网  •  - 📝 博客 -
+> 二维码过期?添加微信 **`haoxianrui`**,备注「前端/后端/全栈」即可拉你入群。 diff --git a/src/main/java/com/youlai/boot/config/WebSocketConfig.java b/src/main/java/com/youlai/boot/config/WebSocketConfig.java deleted file mode 100644 index 1a0e5b2d..00000000 --- a/src/main/java/com/youlai/boot/config/WebSocketConfig.java +++ /dev/null @@ -1,293 +0,0 @@ -package com.youlai.boot.config; - -import cn.hutool.core.util.StrUtil; -import com.youlai.boot.security.model.SysUserDetails; -import com.youlai.boot.security.token.TokenManager; -import com.youlai.boot.support.websocket.service.WebSocketService; -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; -import org.springframework.http.HttpHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.simp.config.ChannelRegistration; -import org.springframework.messaging.simp.config.MessageBrokerRegistry; -import org.springframework.messaging.simp.stomp.StompCommand; -import org.springframework.messaging.simp.stomp.StompHeaderAccessor; -import org.springframework.messaging.support.ChannelInterceptor; -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.security.authentication.AuthenticationCredentialsNotFoundException; -import org.springframework.security.authentication.BadCredentialsException; -import org.springframework.security.core.Authentication; -import org.springframework.security.core.AuthenticationException; -import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; -import org.springframework.web.socket.config.annotation.StompEndpointRegistry; -import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; - -/** - * WebSocket 配置类 - * - * 核心功能: - * - 配置 WebSocket 端点 - * - 配置消息代理 - * - 实现连接认证与授权 - * - 管理用户会话生命周期 - * - * @author Ray.Hao - * @since 3.0.0 - */ -@EnableWebSocketMessageBroker -@Configuration -@Slf4j -public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { - - private static final String WS_ENDPOINT = "/ws"; - private static final String APP_DESTINATION_PREFIX = "/app"; - private static final String USER_DESTINATION_PREFIX = "/user"; - private static final String[] BROKER_DESTINATIONS = {"/topic", "/queue"}; - - private final TokenManager tokenManager; - private final WebSocketService webSocketService; - - public WebSocketConfig(TokenManager tokenManager, @Lazy WebSocketService webSocketService) { - this.tokenManager = tokenManager; - this.webSocketService = webSocketService; - log.info("✓ WebSocket 配置已加载"); - } - - /** - * 注册 STOMP 端点 - * - * 客户端通过该端点建立 WebSocket 连接 - */ - @Override - public void registerStompEndpoints(StompEndpointRegistry registry) { - registry - .addEndpoint(WS_ENDPOINT) - .setAllowedOriginPatterns("*"); // 允许跨域(生产环境建议配置具体域名) - - log.info("✓ STOMP 端点已注册: {}", WS_ENDPOINT); - } - - /** - * 配置消息代理 - * - * - /app 前缀:客户端发送消息到服务端的前缀 - * - /topic 前缀:用于广播消息 - * - /queue 前缀:用于点对点消息 - * - /user 前缀:服务端发送给特定用户的消息前缀 - */ - @Override - public void configureMessageBroker(MessageBrokerRegistry registry) { - // 客户端发送消息的请求前缀 - registry.setApplicationDestinationPrefixes(APP_DESTINATION_PREFIX); - - // 启用简单消息代理,处理 /topic 和 /queue 前缀的消息 - registry.enableSimpleBroker(BROKER_DESTINATIONS); - - // 服务端通知客户端的前缀 - registry.setUserDestinationPrefix(USER_DESTINATION_PREFIX); - - log.info("✓ 消息代理已配置: app={}, broker={}, user={}", - APP_DESTINATION_PREFIX, BROKER_DESTINATIONS, USER_DESTINATION_PREFIX); - } - - /** - * 配置客户端入站通道拦截器 - * - * 核心功能: - * 1. 连接建立时:解析 JWT Token 并绑定用户身份 - * 2. 连接关闭时:触发用户下线通知 - * 3. 安全防护:拦截无效连接请求 - */ - @Override - public void configureClientInboundChannel(ChannelRegistration registration) { - registration.interceptors(new ChannelInterceptor() { - @Override - public Message preSend(@NotNull Message message, @NotNull MessageChannel channel) { - StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); - - // 防御性检查:确保 accessor 不为空 - if (accessor == null) { - log.warn("⚠ 收到异常消息:无法获取 StompHeaderAccessor"); - return ChannelInterceptor.super.preSend(message, channel); - } - - StompCommand command = accessor.getCommand(); - if (command == null) { - return ChannelInterceptor.super.preSend(message, channel); - } - - try { - switch (command) { - case CONNECT: - handleConnect(accessor); - break; - - case DISCONNECT: - handleDisconnect(accessor); - break; - - case SUBSCRIBE: - handleSubscribe(accessor); - break; - - default: - // 其他命令不需要特殊处理 - break; - } - } catch (AuthenticationException ex) { - // 认证失败时强制关闭连接 - log.error("❌ 连接认证失败: {}", ex.getMessage()); - throw ex; - } catch (Exception ex) { - // 捕获其他未知异常 - log.error("❌ WebSocket 消息处理异常", ex); - throw new MessagingException("消息处理失败: " + ex.getMessage()); - } - - return ChannelInterceptor.super.preSend(message, channel); - } - }); - - log.info("✓ 客户端入站通道拦截器已配置"); - } - - /** - * 处理客户端连接请求 - * - * 安全校验流程: - * 1. 提取 Authorization 头 - * 2. 验证 Bearer Token 格式 - * 3. 解析并验证 JWT 有效性 - * 4. 绑定用户身份到当前会话 - * 5. 记录用户上线状态 - */ - private void handleConnect(StompHeaderAccessor accessor) { - String authorization = accessor.getFirstNativeHeader(HttpHeaders.AUTHORIZATION); - - // 安全检查:确保 Authorization 头存在且格式正确 - if (StrUtil.isBlank(authorization)) { - log.warn("⚠ 非法连接请求:缺少 Authorization 头"); - throw new AuthenticationCredentialsNotFoundException("缺少 Authorization 头"); - } - - if (!authorization.startsWith("Bearer ")) { - log.warn("⚠ 非法连接请求:Authorization 头格式错误"); - throw new BadCredentialsException("Authorization 头格式错误"); - } - - // 提取 JWT Token(移除 "Bearer " 前缀) - String token = authorization.substring(7); - - if (StrUtil.isBlank(token)) { - log.warn("⚠ 非法连接请求:Token 为空"); - throw new BadCredentialsException("Token 为空"); - } - - // 解析并验证 Token - Authentication authentication; - try { - authentication = tokenManager.parseToken(token); - } catch (Exception ex) { - log.error("❌ Token 解析失败", ex); - throw new BadCredentialsException("Token 无效: " + ex.getMessage()); - } - - // 验证解析结果 - if (authentication == null || !authentication.isAuthenticated()) { - log.warn("⚠ Token 解析失败:认证对象无效"); - throw new BadCredentialsException("Token 解析失败"); - } - - // 获取用户详细信息 - Object principal = authentication.getPrincipal(); - if (!(principal instanceof SysUserDetails)) { - log.error("❌ 无效的用户凭证类型: {}", principal.getClass().getName()); - throw new BadCredentialsException("用户凭证类型错误"); - } - - SysUserDetails userDetails = (SysUserDetails) principal; - String username = userDetails.getUsername(); - - if (StrUtil.isBlank(username)) { - log.warn("⚠ 用户名为空"); - throw new BadCredentialsException("用户名为空"); - } - - // 绑定用户身份到当前会话(重要:用于 @SendToUser 等注解) - accessor.setUser(authentication); - - // 获取会话 ID - String sessionId = accessor.getSessionId(); - if (sessionId == null) { - log.warn("⚠ 会话 ID 为空,使用临时 ID"); - sessionId = "temp-" + System.nanoTime(); - } - - // 记录用户上线状态 - try { - webSocketService.userConnected(username, sessionId); - log.info("✓ WebSocket 连接建立成功: 用户[{}], 会话[{}]", username, sessionId); - } catch (Exception ex) { - log.error("❌ 记录用户上线状态失败: 用户[{}], 会话[{}]", username, sessionId, ex); - // 不抛出异常,允许连接继续 - } - } - - /** - * 处理客户端断开连接事件 - * - * 注意: - * - 只有成功建立过认证的连接才会触发下线事件 - * - 防止未认证成功的连接产生脏数据 - */ - private void handleDisconnect(StompHeaderAccessor accessor) { - Authentication authentication = (Authentication) accessor.getUser(); - - // 防御性检查:只处理已认证的连接 - if (authentication == null || !authentication.isAuthenticated()) { - log.debug("未认证的连接断开,跳过处理"); - return; - } - - Object principal = authentication.getPrincipal(); - if (!(principal instanceof SysUserDetails)) { - log.warn("⚠ 断开连接时用户凭证类型异常"); - return; - } - - SysUserDetails userDetails = (SysUserDetails) principal; - String username = userDetails.getUsername(); - - if (StrUtil.isNotBlank(username)) { - try { - webSocketService.userDisconnected(username); - log.info("✓ WebSocket 连接断开: 用户[{}]", username); - } catch (Exception ex) { - log.error("❌ 记录用户下线状态失败: 用户[{}]", username, ex); - } - } - } - - /** - * 处理客户端订阅事件(可选) - * - * 用于记录订阅信息或实施订阅级别的权限控制 - */ - private void handleSubscribe(StompHeaderAccessor accessor) { - Authentication authentication = (Authentication) accessor.getUser(); - - if (authentication != null && authentication.isAuthenticated()) { - String destination = accessor.getDestination(); - String username = authentication.getName(); - - log.debug("用户[{}]订阅主题: {}", username, destination); - - // TODO: 这里可以实现订阅级别的权限控制 - // 例如:检查用户是否有权限订阅某个主题 - } - } -} diff --git a/src/main/java/com/youlai/boot/module/file/controller/FileController.java b/src/main/java/com/youlai/boot/module/file/controller/FileController.java index 12526a77..8a0aaa42 100644 --- a/src/main/java/com/youlai/boot/module/file/controller/FileController.java +++ b/src/main/java/com/youlai/boot/module/file/controller/FileController.java @@ -31,13 +31,13 @@ public class FileController { @Operation(summary = "文件上传") public Result uploadFile( @Parameter( - name = "file1", + name = "file", description = "表单文件对象", required = true, in = ParameterIn.DEFAULT, - schema = @Schema(name = "file1", format = "binary") + schema = @Schema(name = "file", format = "binary") ) - @RequestPart(value = "file1") MultipartFile file + @RequestPart(value = "file") MultipartFile file ) { FileInfo fileInfo = fileService.uploadFile(file); return Result.success(fileInfo); diff --git a/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java b/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java index e1b79407..f26a86ef 100644 --- a/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java +++ b/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java @@ -40,15 +40,10 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { - String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION); + String rawToken = resolveToken(request); try { - if (StrUtil.isNotBlank(authorizationHeader) - && authorizationHeader.startsWith(SecurityConstants.BEARER_TOKEN_PREFIX)) { - - // 剥离Bearer前缀获取原始令牌 - String rawToken = authorizationHeader.substring(SecurityConstants.BEARER_TOKEN_PREFIX.length()); - + if (StrUtil.isNotBlank(rawToken)) { // 执行令牌有效性检查(包含密码学验签和过期时间验证) boolean isValidToken = tokenManager.validateToken(rawToken); if (!isValidToken) { @@ -70,4 +65,25 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter { // 继续后续过滤器链执行 filterChain.doFilter(request, response); } + + /** + * 从请求中解析 Token + * 优先从 Authorization Header 获取,其次从 URL 参数获取(支持 SSE) + */ + private String resolveToken(HttpServletRequest request) { + // 1. 从 Authorization Header 获取 + String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION); + if (StrUtil.isNotBlank(authorizationHeader) + && authorizationHeader.startsWith(SecurityConstants.BEARER_TOKEN_PREFIX)) { + return authorizationHeader.substring(SecurityConstants.BEARER_TOKEN_PREFIX.length()); + } + + // 2. 从 URL 参数获取(支持 SSE EventSource) + String tokenParam = request.getParameter("token"); + if (StrUtil.isNotBlank(tokenParam)) { + return tokenParam; + } + + return null; + } } diff --git a/src/main/java/com/youlai/boot/support/mail/service/MailService.java b/src/main/java/com/youlai/boot/support/mail/service/MailService.java index e0b0b704..0e558ce6 100644 --- a/src/main/java/com/youlai/boot/support/mail/service/MailService.java +++ b/src/main/java/com/youlai/boot/support/mail/service/MailService.java @@ -1,13 +1,32 @@ package com.youlai.boot.support.mail.service; +import com.youlai.boot.config.property.MailProperties; +import jakarta.mail.MessagingException; +import jakarta.mail.internet.MimeMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.io.FileSystemResource; +import org.springframework.mail.SimpleMailMessage; +import org.springframework.mail.javamail.JavaMailSender; +import org.springframework.mail.javamail.MimeMessageHelper; +import org.springframework.stereotype.Service; + +import java.io.File; + /** - * 邮件服务接口层 + * 邮件服务 * * @author Ray * @since 2024/8/17 */ -public interface MailService { +@Service +@RequiredArgsConstructor +@Slf4j +public class MailService { + private final JavaMailSender mailSender; + + private final MailProperties mailProperties; /** * 发送简单文本邮件 @@ -16,16 +35,42 @@ public interface MailService { * @param subject 邮件主题 * @param text 邮件内容 */ - void sendMail(String to, String subject, String text) ; + public void sendMail(String to, String subject, String text) { + try { + SimpleMailMessage message = new SimpleMailMessage(); + message.setFrom(mailProperties.getFrom()); + message.setTo(to); + message.setSubject(subject); + message.setText(text); + mailSender.send(message); + } catch (Exception e) { + log.error("发送邮件失败{}", e.getMessage()); + } + } /** * 发送带附件的邮件 * - * @param to 收件人地址 - * @param subject 邮件主题 - * @param text 邮件内容 + * @param to 收件人地址 + * @param subject 邮件主题 + * @param text 邮件内容 * @param filePath 附件路径 */ - void sendMailWithAttachment(String to, String subject, String text, String filePath); + public void sendMailWithAttachment(String to, String subject, String text, String filePath) { + MimeMessage message = mailSender.createMimeMessage(); + try { + MimeMessageHelper helper = new MimeMessageHelper(message, true); + helper.setFrom(mailProperties.getFrom()); + helper.setTo(to); + helper.setSubject(subject); + helper.setText(text, true); // true 表示支持HTML内容 + FileSystemResource file = new FileSystemResource(new File(filePath)); + helper.addAttachment(file.getFilename(), file); + + mailSender.send(message); + } catch (MessagingException e) { + log.error("发送带附件的邮件失败{}", e.getMessage()); + } + } } diff --git a/src/main/java/com/youlai/boot/support/mail/service/impl/MailServiceImpl.java b/src/main/java/com/youlai/boot/support/mail/service/impl/MailServiceImpl.java deleted file mode 100644 index fd9bbe49..00000000 --- a/src/main/java/com/youlai/boot/support/mail/service/impl/MailServiceImpl.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.youlai.boot.support.mail.service.impl; - -import com.youlai.boot.config.property.MailProperties; -import com.youlai.boot.support.mail.service.MailService; -import jakarta.mail.MessagingException; -import jakarta.mail.internet.MimeMessage; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.core.io.FileSystemResource; -import org.springframework.mail.SimpleMailMessage; -import org.springframework.mail.javamail.JavaMailSender; -import org.springframework.mail.javamail.MimeMessageHelper; -import org.springframework.stereotype.Service; - -import java.io.File; - -/** - * 邮件服务实现类 - * - * @author Ray - * @since 2024/8/17 - */ -@Service -@RequiredArgsConstructor -@Slf4j -public class MailServiceImpl implements MailService { - - private final JavaMailSender mailSender; - - private final MailProperties mailProperties; - - /** - * 发送简单文本邮件 - * - * @param to 收件人地址 - * @param subject 邮件主题 - * @param text 邮件内容 - */ - @Override - public void sendMail(String to, String subject, String text) { - try { - SimpleMailMessage message = new SimpleMailMessage(); - message.setFrom(mailProperties.getFrom()); - message.setTo(to); - message.setSubject(subject); - message.setText(text); - mailSender.send(message); - } catch (Exception e) { - log.error("发送邮件失败{}", e.getMessage()); - } - } - - /** - * 发送带附件的邮件 - * - * @param to 收件人地址 - * @param subject 邮件主题 - * @param text 邮件内容 - * @param filePath 附件路径 - */ - @Override - public void sendMailWithAttachment(String to, String subject, String text, String filePath) { - MimeMessage message = mailSender.createMimeMessage(); - try { - MimeMessageHelper helper = new MimeMessageHelper(message, true); - helper.setFrom(mailProperties.getFrom()); - helper.setTo(to); - helper.setSubject(subject); - helper.setText(text, true); // true 表示支持HTML内容 - - FileSystemResource file = new FileSystemResource(new File(filePath)); - helper.addAttachment(file.getFilename(), file); - - mailSender.send(message); - } catch (MessagingException e) { - log.error("发送带附件的邮件失败{}", e.getMessage()); - } - } -} diff --git a/src/main/java/com/youlai/boot/support/sse/OnlineUserCountJob.java b/src/main/java/com/youlai/boot/support/sse/OnlineUserCountJob.java new file mode 100644 index 00000000..b30e015d --- /dev/null +++ b/src/main/java/com/youlai/boot/support/sse/OnlineUserCountJob.java @@ -0,0 +1,36 @@ +package com.youlai.boot.support.sse; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * 在线用户数统计定时任务 + *

+ * 定时统计并广播当前在线用户数量到所有 SSE 客户端 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class OnlineUserCountJob { + + private final SseSessionRegistry sessionRegistry; + private final SseService sseService; + + /** + * 定时统计在线用户数并广播 + *

+ * 每3分钟执行一次,推送当前在线用户数量 + */ + @Scheduled(cron = "0 */3 * * * ?") + public void execute() { + int onlineCount = sessionRegistry.getOnlineUserCount(); + int connectionCount = sessionRegistry.getTotalConnectionCount(); + + log.debug("定时统计:在线用户数={}, 总连接数={}", onlineCount, connectionCount); + + // 发送在线用户数量 + sseService.sendOnlineCount(); + } +} diff --git a/src/main/java/com/youlai/boot/support/sse/SseController.java b/src/main/java/com/youlai/boot/support/sse/SseController.java new file mode 100644 index 00000000..ad07be4f --- /dev/null +++ b/src/main/java/com/youlai/boot/support/sse/SseController.java @@ -0,0 +1,44 @@ +package com.youlai.boot.support.sse; + +import com.youlai.boot.core.web.Result; +import com.youlai.boot.security.model.SysUserDetails; +import com.youlai.boot.security.util.SecurityUtils; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + * SSE 控制器 + */ +@Tag(name = "14. SSE连接") +@Slf4j +@RestController +@RequestMapping("/api/v1/sse") +@RequiredArgsConstructor +public class SseController { + + private final SseService sseService; + + @Operation(summary = "建立SSE连接") + @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter connect() { + SysUserDetails user = SecurityUtils.getUser().orElse(null); + if (user == null) { + log.warn("SSE连接失败:未获取到当前用户"); + return null; + } + return sseService.createConnection(user.getUsername()); + } + + @Operation(summary = "获取在线用户数") + @GetMapping("/online-count") + public Result getOnlineCount() { + return Result.success(sseService.getOnlineUserCount()); + } +} diff --git a/src/main/java/com/youlai/boot/support/sse/SseService.java b/src/main/java/com/youlai/boot/support/sse/SseService.java new file mode 100644 index 00000000..f9424ab8 --- /dev/null +++ b/src/main/java/com/youlai/boot/support/sse/SseService.java @@ -0,0 +1,138 @@ +package com.youlai.boot.support.sse; + +import com.youlai.boot.support.sse.dto.DictChangeEvent; +import com.youlai.boot.support.sse.dto.OnlineUserDTO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * SSE 服务 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class SseService { + + /** SSE 连接超时时间:30 分钟 */ + private static final long TIMEOUT = 30 * 60 * 1000L; + + private final SseSessionRegistry sessionRegistry; + + /** + * 创建 SSE 连接 + * + * @param username 用户名 + * @return SseEmitter + */ + public SseEmitter createConnection(String username) { + if (username == null || username.isEmpty()) { + log.warn("创建SSE连接失败:用户名为空"); + return null; + } + + // 创建 SseEmitter,设置超时时间 + SseEmitter emitter = new SseEmitter(TIMEOUT); + + // 注册用户连接 + sessionRegistry.userConnected(username, emitter); + + // 连接建立后立即发送在线用户数 + try { + emitter.send(SseEmitter.event() + .name(SseTopics.ONLINE_COUNT) + .data(sessionRegistry.getOnlineUserCount())); + } catch (IOException e) { + log.warn("发送初始在线用户数失败: {}", e.getMessage()); + } + + log.info("用户[{}]SSE连接已建立,当前在线用户数: {}", username, sessionRegistry.getOnlineUserCount()); + + // 发送在线用户数变更 + sendOnlineCount(); + + return emitter; + } + + /** + * 发送字典变更事件 + * + * @param dictCode 字典编码 + */ + public void sendDictChange(String dictCode) { + if (dictCode == null || dictCode.isEmpty()) { + log.warn("字典编码为空,跳过发送"); + return; + } + + DictChangeEvent event = new DictChangeEvent(dictCode); + sessionRegistry.broadcast(SseTopics.DICT, event); + log.info("已发送字典变更通知: dictCode={}", dictCode); + } + + /** + * 发送在线用户数 + */ + public void sendOnlineCount() { + int count = sessionRegistry.getOnlineUserCount(); + sessionRegistry.broadcast(SseTopics.ONLINE_COUNT, count); + log.debug("已发送在线用户数: {}", count); + } + + /** + * 发送消息给指定用户 + * + * @param username 用户名 + * @param eventName 事件名称 + * @param data 数据 + */ + public void sendToUser(String username, String eventName, Object data) { + if (username == null || username.isEmpty()) { + log.warn("用户名为空,无法发送消息"); + return; + } + sessionRegistry.sendToUser(username, eventName, data); + log.info("已向用户[{}]发送事件[{}]", username, eventName); + } + + /** + * 获取在线用户列表 + * + * @return 在线用户列表 + */ + public List getOnlineUsers() { + return sessionRegistry.getOnlineUsers(); + } + + /** + * 获取在线用户数 + * + * @return 在线用户数 + */ + public int getOnlineUserCount() { + return sessionRegistry.getOnlineUserCount(); + } + + /** + * 发送系统消息 + * + * @param message 消息内容 + */ + public void sendSystemMessage(String message) { + if (message == null || message.isEmpty()) { + return; + } + Map systemMessage = Map.of( + "sender", "系统通知", + "content", message, + "timestamp", System.currentTimeMillis() + ); + sessionRegistry.broadcast(SseTopics.SYSTEM, systemMessage); + log.info("已发送系统消息: {}", message); + } +} diff --git a/src/main/java/com/youlai/boot/support/sse/SseSessionRegistry.java b/src/main/java/com/youlai/boot/support/sse/SseSessionRegistry.java new file mode 100644 index 00000000..95aedf1e --- /dev/null +++ b/src/main/java/com/youlai/boot/support/sse/SseSessionRegistry.java @@ -0,0 +1,196 @@ +package com.youlai.boot.support.sse; + +import com.youlai.boot.support.sse.dto.OnlineUserDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * SSE 会话注册表 + *

+ * 维护 SSE 连接的用户会话信息,支持多设备同时登录 + */ +@Slf4j +@Component +public class SseSessionRegistry { + + /** 用户名 -> SseEmitter 集合(支持多设备) */ + private final Map> userEmittersMap = new ConcurrentHashMap<>(); + + /** SseEmitter -> 用户名(快速定位用户) */ + private final Map emitterUserMap = new ConcurrentHashMap<>(); + + /** SseEmitter -> 连接时间 */ + private final Map emitterTimeMap = new ConcurrentHashMap<>(); + + /** + * 用户上线(建立 SSE 连接) + * + * @param username 用户名 + * @param emitter SseEmitter + */ + public void userConnected(String username, SseEmitter emitter) { + userEmittersMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet()).add(emitter); + emitterUserMap.put(emitter, username); + emitterTimeMap.put(emitter, System.currentTimeMillis()); + log.debug("用户[{}]SSE连接已建立", username); + + // 设置连接超时和完成回调 + emitter.onCompletion(() -> { + removeEmitter(emitter); + log.debug("用户[{}]SSE连接已完成", username); + }); + emitter.onTimeout(() -> { + removeEmitter(emitter); + log.debug("用户[{}]SSE连接超时", username); + }); + emitter.onError(e -> { + removeEmitter(emitter); + log.debug("用户[{}]SSE连接错误: {}", username, e.getMessage()); + }); + } + + /** + * 移除指定 emitter + */ + private void removeEmitter(SseEmitter emitter) { + String username = emitterUserMap.remove(emitter); + if (username == null) { + return; + } + emitterTimeMap.remove(emitter); + + Set emitters = userEmittersMap.get(username); + if (emitters != null) { + emitters.remove(emitter); + if (emitters.isEmpty()) { + userEmittersMap.remove(username); + log.debug("用户[{}]所有SSE连接已断开", username); + } + } + } + + /** + * 用户下线(断开所有 SSE 连接) + * + * @param username 用户名 + */ + public void userDisconnected(String username) { + Set emitters = userEmittersMap.remove(username); + if (emitters == null) { + return; + } + emitters.forEach(emitter -> { + emitterUserMap.remove(emitter); + emitterTimeMap.remove(emitter); + try { + emitter.complete(); + } catch (Exception ignored) { + } + }); + log.debug("用户[{}]已下线,移除{}个SSE连接", username, emitters.size()); + } + + /** + * 获取在线用户数量 + */ + public int getOnlineUserCount() { + return userEmittersMap.size(); + } + + /** + * 获取总连接数量(包括多设备) + */ + public int getTotalConnectionCount() { + return emitterUserMap.size(); + } + + /** + * 获取指定用户的连接数量 + */ + public int getUserConnectionCount(String username) { + Set emitters = userEmittersMap.get(username); + return emitters != null ? emitters.size() : 0; + } + + /** + * 检查用户是否在线 + */ + public boolean isUserOnline(String username) { + Set emitters = userEmittersMap.get(username); + return emitters != null && !emitters.isEmpty(); + } + + /** + * 获取所有在线用户列表 + */ + public List getOnlineUsers() { + return userEmittersMap.entrySet().stream() + .map(entry -> { + String username = entry.getKey(); + Set emitters = entry.getValue(); + long earliestTime = emitters.stream() + .map(emitterTimeMap::get) + .filter(t -> t != null) + .mapToLong(Long::longValue) + .min() + .orElse(System.currentTimeMillis()); + return new OnlineUserDTO(username, emitters.size(), earliestTime); + }) + .collect(Collectors.toList()); + } + + /** + * 获取所有活跃的 SseEmitter + */ + public Set getAllEmitters() { + return emitterUserMap.keySet(); + } + + /** + * 获取指定用户的所有 SseEmitter + */ + public Set getUserEmitters(String username) { + return userEmittersMap.get(username); + } + + /** + * 向指定 emitter 发送事件 + */ + public boolean sendEvent(SseEmitter emitter, String eventName, Object data) { + try { + emitter.send(SseEmitter.event() + .name(eventName) + .data(data)); + return true; + } catch (IOException e) { + log.warn("发送SSE事件失败: {}", e.getMessage()); + removeEmitter(emitter); + return false; + } + } + + /** + * 向所有连接广播事件 + */ + public void broadcast(String eventName, Object data) { + getAllEmitters().forEach(emitter -> sendEvent(emitter, eventName, data)); + } + + /** + * 向指定用户发送事件 + */ + public void sendToUser(String username, String eventName, Object data) { + Set emitters = userEmittersMap.get(username); + if (emitters != null) { + emitters.forEach(emitter -> sendEvent(emitter, eventName, data)); + } + } +} diff --git a/src/main/java/com/youlai/boot/support/sse/SseTopics.java b/src/main/java/com/youlai/boot/support/sse/SseTopics.java new file mode 100644 index 00000000..c3dcdf6d --- /dev/null +++ b/src/main/java/com/youlai/boot/support/sse/SseTopics.java @@ -0,0 +1,19 @@ +package com.youlai.boot.support.sse; + +/** + * SSE 主题常量 + */ +public final class SseTopics { + + private SseTopics() { + } + + /** 字典变更事件 */ + public static final String DICT = "dict"; + + /** 在线用户数事件 */ + public static final String ONLINE_COUNT = "online-count"; + + /** 系统消息事件 */ + public static final String SYSTEM = "system"; +} diff --git a/src/main/java/com/youlai/boot/support/websocket/dto/DictChangeEvent.java b/src/main/java/com/youlai/boot/support/sse/dto/DictChangeEvent.java similarity index 85% rename from src/main/java/com/youlai/boot/support/websocket/dto/DictChangeEvent.java rename to src/main/java/com/youlai/boot/support/sse/dto/DictChangeEvent.java index a07843d6..6064e0b2 100644 --- a/src/main/java/com/youlai/boot/support/websocket/dto/DictChangeEvent.java +++ b/src/main/java/com/youlai/boot/support/sse/dto/DictChangeEvent.java @@ -1,4 +1,4 @@ -package com.youlai.boot.support.websocket.dto; +package com.youlai.boot.support.sse.dto; import lombok.AllArgsConstructor; import lombok.Data; @@ -10,7 +10,7 @@ import java.io.Serializable; /** * 字典变更事件 *

- * 当字典数据发生变更时,通过 WebSocket 广播此事件通知前端清除缓存。 + * 当字典数据发生变更时,通过 SSE 广播此事件通知前端清除缓存。 * 前端收到通知后清除对应字典的本地缓存,下次使用时重新从服务端加载。 * * @author Ray.Hao diff --git a/src/main/java/com/youlai/boot/support/websocket/dto/OnlineUserDTO.java b/src/main/java/com/youlai/boot/support/sse/dto/OnlineUserDTO.java similarity index 93% rename from src/main/java/com/youlai/boot/support/websocket/dto/OnlineUserDTO.java rename to src/main/java/com/youlai/boot/support/sse/dto/OnlineUserDTO.java index 820ad42a..26f5d129 100644 --- a/src/main/java/com/youlai/boot/support/websocket/dto/OnlineUserDTO.java +++ b/src/main/java/com/youlai/boot/support/sse/dto/OnlineUserDTO.java @@ -1,4 +1,4 @@ -package com.youlai.boot.support.websocket.dto; +package com.youlai.boot.support.sse.dto; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/src/main/java/com/youlai/boot/support/websocket/job/OnlineUserCountJob.java b/src/main/java/com/youlai/boot/support/websocket/job/OnlineUserCountJob.java deleted file mode 100644 index 8e59bcf3..00000000 --- a/src/main/java/com/youlai/boot/support/websocket/job/OnlineUserCountJob.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.youlai.boot.support.websocket.job; - -import com.youlai.boot.support.websocket.publisher.WebSocketPublisher; -import com.youlai.boot.support.websocket.session.UserSessionRegistry; -import com.youlai.boot.support.websocket.topic.WebSocketTopics; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -/** - * 在线用户数统计定时任务 - *

- * 定时统计并广播当前在线用户数量到所有WebSocket客户端。 - * 用于解决以下问题: - *

- * - * @author Ray.Hao - * @since 3.0.0 - */ -@Component -@Slf4j -@RequiredArgsConstructor -public class OnlineUserCountJob { - - private final UserSessionRegistry userSessionRegistry; - private final WebSocketPublisher webSocketPublisher; - - /** - * 定时统计在线用户数并广播 - *

- * 每3分钟执行一次,推送当前在线用户数量 - */ - @Scheduled(cron = "0 */3 * * * ?") - public void execute() { - int onlineCount = userSessionRegistry.getOnlineUserCount(); - int sessionCount = userSessionRegistry.getTotalSessionCount(); - - log.debug("定时统计:在线用户数={}, 总会话数={}", onlineCount, sessionCount); - - // 广播在线用户数量 - webSocketPublisher.publish(WebSocketTopics.TOPIC_ONLINE_COUNT, onlineCount); - } -} diff --git a/src/main/java/com/youlai/boot/support/websocket/publisher/WebSocketPublisher.java b/src/main/java/com/youlai/boot/support/websocket/publisher/WebSocketPublisher.java deleted file mode 100644 index e01f6241..00000000 --- a/src/main/java/com/youlai/boot/support/websocket/publisher/WebSocketPublisher.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.youlai.boot.support.websocket.publisher; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.simp.SimpMessagingTemplate; -import org.springframework.stereotype.Service; -import tools.jackson.core.JacksonException; -import tools.jackson.databind.ObjectMapper; - -@Service -@RequiredArgsConstructor -@Slf4j -public class WebSocketPublisher { - - private SimpMessagingTemplate messagingTemplate; - private final ObjectMapper objectMapper; - - @Autowired(required = false) - public void setMessagingTemplate(SimpMessagingTemplate messagingTemplate) { - this.messagingTemplate = messagingTemplate; - } - - public void publish(String destination, Object payload) { - if (messagingTemplate == null) { - log.warn("消息模板尚未初始化,无法发送消息: destination={}", destination); - return; - } - - try { - Object body = serializeIfNeeded(payload); - messagingTemplate.convertAndSend(destination, body); - } catch (Exception e) { - log.error("发送消息失败: destination={}", destination, e); - } - } - - public void publishToUser(String username, String destination, Object payload) { - if (messagingTemplate == null) { - log.warn("消息模板尚未初始化,无法发送用户消息: username={}, destination={}", username, destination); - return; - } - - try { - Object body = serializeIfNeeded(payload); - messagingTemplate.convertAndSendToUser(username, destination, body); - } catch (Exception e) { - log.error("发送用户消息失败: username={}, destination={}", username, destination, e); - } - } - - private Object serializeIfNeeded(Object payload) throws JacksonException { - if (payload == null) { - return null; - } - if (payload instanceof String || payload instanceof Number || payload instanceof Boolean) { - return payload; - } - return objectMapper.writeValueAsString(payload); - } -} diff --git a/src/main/java/com/youlai/boot/support/websocket/service/WebSocketService.java b/src/main/java/com/youlai/boot/support/websocket/service/WebSocketService.java deleted file mode 100644 index 99fd050f..00000000 --- a/src/main/java/com/youlai/boot/support/websocket/service/WebSocketService.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.youlai.boot.support.websocket.service; - -import com.youlai.boot.support.websocket.dto.OnlineUserDTO; - -import java.util.List; - -/** - * WebSocket服务接口 - *

- * 提供与WebSocket连接管理相关的功能,包括: - * - 用户连接/断开事件处理 - * - 字典数据变更通知 - * - 系统消息推送 - *

- * - * @author Ray.Hao - * @since 3.0.0 - */ -public interface WebSocketService { - - /** - * 处理用户连接事件 - * - * @param username 用户名 - * @param sessionId WebSocket会话ID - */ - void userConnected(String username, String sessionId); - - /** - * 处理用户断开连接事件 - * - * @param username 用户名 - */ - void userDisconnected(String username); - - /** - * 广播字典数据变更通知 - * - * @param dictCode 字典编码 - */ - void broadcastDictChange(String dictCode); - - /** - * 发送系统通知给特定用户 - * - * @param username 目标用户名 - * @param message 通知消息内容 - */ - void sendNotification(String username, Object message); - - /** - * 获取在线用户列表 - * - * @return 在线用户信息列表 - */ - List getOnlineUsers(); -} diff --git a/src/main/java/com/youlai/boot/support/websocket/service/impl/WebSocketServiceImpl.java b/src/main/java/com/youlai/boot/support/websocket/service/impl/WebSocketServiceImpl.java deleted file mode 100644 index 708ec9c0..00000000 --- a/src/main/java/com/youlai/boot/support/websocket/service/impl/WebSocketServiceImpl.java +++ /dev/null @@ -1,226 +0,0 @@ -package com.youlai.boot.support.websocket.service.impl; - -import com.youlai.boot.support.websocket.dto.DictChangeEvent; -import com.youlai.boot.support.websocket.dto.OnlineUserDTO; -import com.youlai.boot.support.websocket.publisher.WebSocketPublisher; -import com.youlai.boot.support.websocket.session.UserSessionRegistry; -import com.youlai.boot.support.websocket.service.WebSocketService; -import com.youlai.boot.support.websocket.topic.WebSocketTopics; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.util.List; -import java.util.Map; - -/** - * WebSocket 服务实现类 - * - * 核心功能: - * - 用户在线状态管理(支持多设备登录) - * - 消息推送(广播、点对点) - * - 字典变更通知 - * - * @author Ray.Hao - * @since 3.0.0 - */ -@Service -@Slf4j -public class WebSocketServiceImpl implements WebSocketService { - - private final UserSessionRegistry userSessionRegistry; - private final WebSocketPublisher webSocketPublisher; - - public WebSocketServiceImpl(UserSessionRegistry userSessionRegistry, WebSocketPublisher webSocketPublisher) { - this.userSessionRegistry = userSessionRegistry; - this.webSocketPublisher = webSocketPublisher; - } - - // ==================== 用户在线状态管理 ==================== - - /** - * 处理用户连接事件 - * - * @param username 用户名 - * @param sessionId WebSocket 会话 ID - */ - @Override - public void userConnected(String username, String sessionId) { - if (username == null || username.isEmpty()) { - log.warn("用户连接失败:用户名为空"); - return; - } - - if (sessionId == null || sessionId.isEmpty()) { - log.warn("用户[{}]连接失败:会话 ID 为空", username); - return; - } - - userSessionRegistry.userConnected(username, sessionId); - - int sessionCount = userSessionRegistry.getUserSessionCount(username); - int totalOnlineUsers = userSessionRegistry.getOnlineUserCount(); - - log.info("✓ 用户[{}]会话[{}]上线(该用户共 {} 个会话,系统总在线用户数:{})", - username, sessionId, sessionCount, totalOnlineUsers); - - // 广播在线用户数变更 - broadcastOnlineUserCount(); - } - - /** - * 处理用户断开连接事件 - * - * @param username 用户名 - */ - @Override - public void userDisconnected(String username) { - if (username == null || username.isEmpty()) { - return; - } - - userSessionRegistry.userDisconnected(username); - - int totalOnlineUsers = userSessionRegistry.getOnlineUserCount(); - log.info("✓ 用户[{}]下线(系统总在线用户数:{})", username, totalOnlineUsers); - - // 广播在线用户数变更 - broadcastOnlineUserCount(); - } - - /** - * 移除指定会话(单个设备下线) - * - * @param sessionId 会话 ID - */ - public void removeSession(String sessionId) { - userSessionRegistry.removeSession(sessionId); - broadcastOnlineUserCount(); - } - - /** - * 获取在线用户列表 - * - * @return 在线用户信息列表 - */ - public List getOnlineUsers() { - return userSessionRegistry.getOnlineUsers(); - } - - /** - * 获取在线用户数量 - * - * @return 在线用户数(不是会话数) - */ - public int getOnlineUserCount() { - return userSessionRegistry.getOnlineUserCount(); - } - - /** - * 获取在线会话总数 - * - * @return 所有在线会话的总数 - */ - public int getTotalSessionCount() { - return userSessionRegistry.getTotalSessionCount(); - } - - /** - * 检查用户是否在线 - * - * @param username 用户名 - * @return 是否在线 - */ - public boolean isUserOnline(String username) { - return userSessionRegistry.isUserOnline(username); - } - - /** - * 获取指定用户的会话数量 - * - * @param username 用户名 - * @return 会话数量 - */ - public int getUserSessionCount(String username) { - return userSessionRegistry.getUserSessionCount(username); - } - - /** - * 手动触发在线用户数量广播 - * - * 供外部服务(如定时任务)调用 - */ - public void notifyOnlineUsersChange() { - log.info("手动触发在线用户数量通知,当前在线用户数:{}", getOnlineUserCount()); - broadcastOnlineUserCount(); - } - - /** - * 广播在线用户数量变更(内部方法) - */ - private void broadcastOnlineUserCount() { - int count = getOnlineUserCount(); - webSocketPublisher.publish(WebSocketTopics.TOPIC_ONLINE_COUNT, count); - log.debug("✓ 已广播在线用户数量: {}", count); - } - - // ==================== 消息推送功能 ==================== - - /** - * 向所有客户端广播字典更新事件 - * - * @param dictCode 字典编码 - */ - @Override - public void broadcastDictChange(String dictCode) { - if (dictCode == null || dictCode.isEmpty()) { - log.warn("字典编码为空,跳过广播"); - return; - } - - DictChangeEvent event = new DictChangeEvent(dictCode); - webSocketPublisher.publish(WebSocketTopics.TOPIC_DICT, event); - log.info("✓ 已广播字典变更通知: dictCode={}", dictCode); - } - - /** - * 向特定用户发送通知消息 - * - * @param username 目标用户名 - * @param message 消息内容 - */ - @Override - public void sendNotification(String username, Object message) { - if (username == null || username.isEmpty()) { - log.warn("用户名为空,无法发送通知"); - return; - } - - if (message == null) { - log.warn("消息内容为空,无法发送给用户[{}]", username); - return; - } - - webSocketPublisher.publishToUser(username, WebSocketTopics.USER_QUEUE_MESSAGES, message); - log.info("✓ 已向用户[{}]发送通知", username); - } - - /** - * 广播系统消息给所有用户 - * - * @param message 消息内容 - */ - public void broadcastSystemMessage(String message) { - if (message == null || message.isEmpty()) { - log.warn("消息内容为空,无法广播"); - return; - } - - Map systemMessage = Map.of( - "sender", "系统通知", - "content", message, - "timestamp", System.currentTimeMillis() - ); - webSocketPublisher.publish(WebSocketTopics.TOPIC_PUBLIC, systemMessage); - log.info("✓ 已广播系统消息: {}", message); - } -} diff --git a/src/main/java/com/youlai/boot/support/websocket/session/UserSessionRegistry.java b/src/main/java/com/youlai/boot/support/websocket/session/UserSessionRegistry.java deleted file mode 100644 index 12090211..00000000 --- a/src/main/java/com/youlai/boot/support/websocket/session/UserSessionRegistry.java +++ /dev/null @@ -1,179 +0,0 @@ -package com.youlai.boot.support.websocket.session; - -import com.youlai.boot.support.websocket.dto.OnlineUserDTO; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -/** - * WebSocket 用户会话注册表 - *

- * 维护WebSocket连接的用户会话信息,支持多设备同时登录。 - * 采用双Map结构实现高效查询: - *

    - *
  • userSessionsMap: 用户名 -> 会话ID集合(支持多设备)
  • - *
  • sessionDetailsMap: 会话ID -> 会话详情(快速定位用户)
  • - *
- * - * @author Ray.Hao - * @since 3.0.0 - */ -@Slf4j -@Component -public class UserSessionRegistry { - - /** - * 用户会话映射表 - *

- * Key: 用户名 - * Value: 该用户所有WebSocket会话ID集合(支持多设备登录) - */ - private final Map> userSessionsMap = new ConcurrentHashMap<>(); - - /** - * 会话详情映射表 - *

- * Key: WebSocket会话ID - * Value: 会话详情(包含用户名、连接时间等) - */ - private final Map sessionDetailsMap = new ConcurrentHashMap<>(); - - /** - * 用户上线(建立WebSocket连接) - * - * @param username 用户名 - * @param sessionId WebSocket会话ID - */ - public void userConnected(String username, String sessionId) { - userSessionsMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet()).add(sessionId); - sessionDetailsMap.put(sessionId, new SessionInfo(username, sessionId, System.currentTimeMillis())); - log.debug("用户[{}]会话[{}]已注册", username, sessionId); - } - - /** - * 用户下线(断开所有WebSocket连接) - *

- * 移除该用户的所有会话信息 - * - * @param username 用户名 - */ - public void userDisconnected(String username) { - Set sessions = userSessionsMap.remove(username); - if (sessions == null) { - return; - } - sessions.forEach(sessionDetailsMap::remove); - log.debug("用户[{}]已下线,移除{}个会话", username, sessions.size()); - } - - /** - * 移除指定会话(单设备下线) - *

- * 当用户某一设备断开连接时调用,保留其他设备的会话 - * - * @param sessionId WebSocket会话ID - */ - public void removeSession(String sessionId) { - SessionInfo sessionInfo = sessionDetailsMap.remove(sessionId); - if (sessionInfo == null) { - return; - } - - String username = sessionInfo.getUsername(); - Set sessions = userSessionsMap.get(username); - if (sessions == null) { - return; - } - - sessions.remove(sessionId); - if (sessions.isEmpty()) { - // 该用户没有任何会话了,移除用户记录 - userSessionsMap.remove(username); - log.debug("用户[{}]最后一个会话已移除", username); - } - } - - /** - * 获取在线用户数量 - * - * @return 当前在线用户数(非会话数) - */ - public int getOnlineUserCount() { - return userSessionsMap.size(); - } - - /** - * 获取指定用户的会话数量 - * - * @param username 用户名 - * @return 该用户的WebSocket会话数量(多设备登录时大于1) - */ - public int getUserSessionCount(String username) { - Set sessions = userSessionsMap.get(username); - return sessions != null ? sessions.size() : 0; - } - - /** - * 获取在线会话总数 - * - * @return 所有WebSocket会话的总数(包含多设备) - */ - public int getTotalSessionCount() { - return sessionDetailsMap.size(); - } - - /** - * 检查用户是否在线 - * - * @param username 用户名 - * @return 是否在线(至少有一个活跃会话) - */ - public boolean isUserOnline(String username) { - Set sessions = userSessionsMap.get(username); - return sessions != null && !sessions.isEmpty(); - } - - /** - * 获取所有在线用户列表 - * - * @return 在线用户信息列表 - */ - public List getOnlineUsers() { - return userSessionsMap.entrySet().stream() - .map(entry -> { - String username = entry.getKey(); - Set sessions = entry.getValue(); - // 取最早的连接时间作为登录时间 - long earliestLoginTime = sessions.stream() - .map(sessionDetailsMap::get) - .filter(info -> info != null) - .mapToLong(SessionInfo::getConnectTime) - .min() - .orElse(System.currentTimeMillis()); - - return new OnlineUserDTO(username, sessions.size(), earliestLoginTime); - }) - .collect(Collectors.toList()); - } - - /** - * WebSocket 会话详情(内部使用) - */ - @Data - @AllArgsConstructor - private static class SessionInfo { - /** 用户名 */ - private String username; - /** WebSocket会话ID */ - private String sessionId; - /** 连接时间戳 */ - private long connectTime; - } -} diff --git a/src/main/java/com/youlai/boot/support/websocket/topic/WebSocketTopics.java b/src/main/java/com/youlai/boot/support/websocket/topic/WebSocketTopics.java deleted file mode 100644 index 62a7741c..00000000 --- a/src/main/java/com/youlai/boot/support/websocket/topic/WebSocketTopics.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.youlai.boot.support.websocket.topic; - -public final class WebSocketTopics { - - private WebSocketTopics() { - } - - public static final String TOPIC_DICT = "/topic/dict"; - public static final String TOPIC_ONLINE_COUNT = "/topic/online-count"; - public static final String TOPIC_PUBLIC = "/topic/public"; - - public static final String USER_QUEUE_MESSAGES = "/queue/messages"; - public static final String USER_QUEUE_MESSAGE = "/queue/message"; -} diff --git a/src/main/java/com/youlai/boot/system/controller/DictController.java b/src/main/java/com/youlai/boot/system/controller/DictController.java index b88b662c..700899f3 100644 --- a/src/main/java/com/youlai/boot/system/controller/DictController.java +++ b/src/main/java/com/youlai/boot/system/controller/DictController.java @@ -16,7 +16,7 @@ import com.youlai.boot.system.model.form.DictForm; import com.youlai.boot.common.annotation.Log; import com.youlai.boot.system.service.DictItemService; import com.youlai.boot.system.service.DictService; -import com.youlai.boot.support.websocket.service.WebSocketService; +import com.youlai.boot.support.sse.SseService; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.Operation; @@ -42,7 +42,7 @@ public class DictController { private final DictService dictService; private final DictItemService dictItemService; - private final WebSocketService webSocketService; + private final SseService sseService; //--------------------------------------------------- // 字典相关接口 @@ -82,7 +82,7 @@ public class DictController { boolean result = dictService.saveDict(formData); // 发送字典更新通知 if (result) { - webSocketService.broadcastDictChange(formData.getDictCode()); + sseService.sendDictChange(formData.getDictCode()); } return Result.judge(result); } @@ -97,7 +97,7 @@ public class DictController { boolean status = dictService.updateDict(id, dictForm); // 发送字典更新通知 if (status && dictForm.getDictCode() != null) { - webSocketService.broadcastDictChange(dictForm.getDictCode()); + sseService.sendDictChange(dictForm.getDictCode()); } return Result.judge(status); } @@ -110,14 +110,14 @@ public class DictController { ) { // 获取字典编码列表,用于发送删除通知 List dictCodes = dictService.getDictCodesByIds(Arrays.stream(ids.split(",")).toList()); - + dictService.deleteDictByIds(Arrays.stream(ids.split(",")).toList()); - + // 发送字典删除通知 for (String dictCode : dictCodes) { - webSocketService.broadcastDictChange(dictCode); + sseService.sendDictChange(dictCode); } - + return Result.success(); } @@ -155,12 +155,12 @@ public class DictController { ) { formData.setDictCode(dictCode); boolean result = dictItemService.saveDictItem(formData); - + // 发送字典更新通知 if (result) { - webSocketService.broadcastDictChange(dictCode); + sseService.sendDictChange(dictCode); } - + return Result.judge(result); } @@ -186,12 +186,12 @@ public class DictController { formData.setId(itemId); formData.setDictCode(dictCode); boolean status = dictItemService.updateDictItem(formData); - + // 发送字典更新通知 if (status) { - webSocketService.broadcastDictChange(dictCode); + sseService.sendDictChange(dictCode); } - + return Result.judge(status); } @@ -203,10 +203,10 @@ public class DictController { @Parameter(description = "字典ID,多个以英文逗号(,)拼接") @PathVariable String itemIds ) { dictItemService.deleteDictItemByIds(itemIds); - + // 发送字典更新通知 - webSocketService.broadcastDictChange(dictCode); - + sseService.sendDictChange(dictCode); + return Result.success(); } diff --git a/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java b/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java index e9f0d75d..00bdf326 100644 --- a/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java +++ b/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java @@ -8,7 +8,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.youlai.boot.core.exception.BusinessException; -import com.youlai.boot.support.websocket.dto.OnlineUserDTO; +import com.youlai.boot.support.sse.dto.OnlineUserDTO; import com.youlai.boot.security.util.SecurityUtils; import com.youlai.boot.system.converter.NoticeConverter; import com.youlai.boot.system.enums.NoticePublishStatusEnum; @@ -27,7 +27,7 @@ import com.youlai.boot.system.model.vo.NoticeDetailVO; import com.youlai.boot.system.service.NoticeService; import com.youlai.boot.system.service.UserNoticeService; import com.youlai.boot.system.service.UserService; -import com.youlai.boot.support.websocket.service.WebSocketService; +import com.youlai.boot.support.sse.SseService; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -52,7 +52,7 @@ public class NoticeServiceImpl extends ServiceImpl impleme private final NoticeConverter noticeConverter; private final UserNoticeService userNoticeService; private final UserService userService; - private final WebSocketService webSocketService; + private final SseService sseService; /** * 获取通知公告分页列表 @@ -213,7 +213,7 @@ public class NoticeServiceImpl extends ServiceImpl impleme Set receivers = targetUserList.stream().map(User::getUsername).collect(Collectors.toSet()); // 获取在线用户名集合 - Set allOnlineUsers = webSocketService.getOnlineUsers().stream() + Set allOnlineUsers = sseService.getOnlineUsers().stream() .map(OnlineUserDTO::getUsername) .collect(Collectors.toSet()); @@ -227,7 +227,7 @@ public class NoticeServiceImpl extends ServiceImpl impleme noticeDto.setPublishTime(notice.getPublishTime()); // 向在线接收者推送通知 - onlineReceivers.forEach(receiver -> webSocketService.sendNotification(receiver, noticeDto)); + onlineReceivers.forEach(receiver -> sseService.sendToUser(receiver, "notice", noticeDto)); } return publishResult; } @@ -261,6 +261,19 @@ public class NoticeServiceImpl extends ServiceImpl impleme userNoticeService.remove(new LambdaQueryWrapper() .eq(UserNotice::getNoticeId, id) ); + + // 通知前端移除该通知 + NoticeDTO noticeDto = new NoticeDTO(); + noticeDto.setId(id); + + // 获取所有在线用户 + Set allOnlineUsers = sseService.getOnlineUsers().stream() + .map(OnlineUserDTO::getUsername) + .collect(Collectors.toSet()); + + // 向所有在线用户推送撤回通知 + allOnlineUsers.forEach(username -> + sseService.sendToUser(username, "notice-revoke", noticeDto)); } return revokeResult; } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index fdeb1a92..cdd5d77a 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -219,13 +219,3 @@ wx: miniapp: appid: Your_AppId secret: Your_AppSecret - -# Actuator 端点配置(监控端点暴露) -management: - endpoints: - web: - exposure: - include: health,info,metrics,env,loggers,heapdump,threaddump - endpoint: - health: - show-details: always