From 465e63c99d75dfc8f1176de917de2230cc590b8a Mon Sep 17 00:00:00 2001
From: "Ray.Hao" <1490493387@qq.com>
Date: Wed, 18 Mar 2026 17:41:05 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20WebSocket=20=E8=BF=81=E7=A7=BB=E5=88=B0?=
=?UTF-8?q?=20SSE=20=E5=AE=9E=E7=8E=B0=E5=AE=9E=E6=97=B6=E6=8E=A8=E9=80=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
CHANGELOG.md | 60 ----
README.md | 17 +-
.../youlai/boot/config/WebSocketConfig.java | 293 ------------------
.../file/controller/FileController.java | 6 +-
.../filter/TokenAuthenticationFilter.java | 30 +-
.../support/mail/service/MailService.java | 59 +++-
.../mail/service/impl/MailServiceImpl.java | 79 -----
.../boot/support/sse/OnlineUserCountJob.java | 36 +++
.../boot/support/sse/SseController.java | 44 +++
.../youlai/boot/support/sse/SseService.java | 138 +++++++++
.../boot/support/sse/SseSessionRegistry.java | 196 ++++++++++++
.../youlai/boot/support/sse/SseTopics.java | 19 ++
.../dto/DictChangeEvent.java | 4 +-
.../{websocket => sse}/dto/OnlineUserDTO.java | 2 +-
.../websocket/job/OnlineUserCountJob.java | 47 ---
.../publisher/WebSocketPublisher.java | 61 ----
.../websocket/service/WebSocketService.java | 57 ----
.../service/impl/WebSocketServiceImpl.java | 226 --------------
.../session/UserSessionRegistry.java | 179 -----------
.../websocket/topic/WebSocketTopics.java | 14 -
.../system/controller/DictController.java | 34 +-
.../service/impl/NoticeServiceImpl.java | 23 +-
src/main/resources/application-dev.yml | 10 -
23 files changed, 551 insertions(+), 1083 deletions(-)
delete mode 100644 CHANGELOG.md
delete mode 100644 src/main/java/com/youlai/boot/config/WebSocketConfig.java
delete mode 100644 src/main/java/com/youlai/boot/support/mail/service/impl/MailServiceImpl.java
create mode 100644 src/main/java/com/youlai/boot/support/sse/OnlineUserCountJob.java
create mode 100644 src/main/java/com/youlai/boot/support/sse/SseController.java
create mode 100644 src/main/java/com/youlai/boot/support/sse/SseService.java
create mode 100644 src/main/java/com/youlai/boot/support/sse/SseSessionRegistry.java
create mode 100644 src/main/java/com/youlai/boot/support/sse/SseTopics.java
rename src/main/java/com/youlai/boot/support/{websocket => sse}/dto/DictChangeEvent.java (85%)
rename src/main/java/com/youlai/boot/support/{websocket => sse}/dto/OnlineUserDTO.java (93%)
delete mode 100644 src/main/java/com/youlai/boot/support/websocket/job/OnlineUserCountJob.java
delete mode 100644 src/main/java/com/youlai/boot/support/websocket/publisher/WebSocketPublisher.java
delete mode 100644 src/main/java/com/youlai/boot/support/websocket/service/WebSocketService.java
delete mode 100644 src/main/java/com/youlai/boot/support/websocket/service/impl/WebSocketServiceImpl.java
delete mode 100644 src/main/java/com/youlai/boot/support/websocket/session/UserSessionRegistry.java
delete mode 100644 src/main/java/com/youlai/boot/support/websocket/topic/WebSocketTopics.java
diff --git a/CHANGELOG.md b/CHANGELOG.md
deleted file mode 100644
index 85c30ab5..00000000
--- a/CHANGELOG.md
+++ /dev/null
@@ -1,60 +0,0 @@
-# 2.7.1 (2024/4/18)
-### 🐛 fix
-- 修复用户名或者密码错误时,返回的错误信息不正确问题
-### 🛠️ refactor
-- JWT 解析和验证代码优化重构
-- 优化代码结构和完善注释,提高代码可读性
-
-# 2.7.0 (2024/4/13)
-### ✨ feat
-- 集成 Mybatis-Plus generator 代码生成器
-
-# 2.6.0 (2024/3/6)
-
-### ✨ feat
-- 黑名单方式实现 JWT 主动注销过期
-### 🛠️ refactor
-- 角色权限重构
-
-
-# 2.5.0 (2023/12/6)
-### ✨ feat
-- [集成 Spring Cache 和 Redis 缓存,路由缓存](https://blog.csdn.net/u013737132/article/details/134789862)
-### 🛠️ refactor
-- 权限判断逻辑调整,用户绑定权限调整为角色绑定权限
-### fix
-- [接口无请求权限,Spring Security 自定义异常无效问题修复](https://youlai.blog.csdn.net/article/details/134718249)
-
-
-# 2.4.1 (2023/11/7)
-### ✂️ refactor
-- 项目目录结构优化
-### ⬆️ chore
-- 升级 SpringBoot 版本 `3.1.4` → `3.1.5`
-
-
-# 2.2.1 (2023/5/25)
-
-### 🐛 fix
-
-- 修复多级路由的组件路径错误导致页面404问题
-
-# 2.2.0 (2023/5/21)
-
-### ✨ feat
-- 菜单、角色、字典、部门添加接口权限控制
-
-### 🐛 fix
-
-- 用户登录权限缓存键值不一致导致获取用户数据权限错误问题修复
-
-### ✂️ refactor
-
-- 递归获取菜单、部门属性列表代码重构优化
-
-### ⬆️ chore
-- 升级 SpringBoot 版本 `3.0.6` → `3.1.0`
-
-### 📝 docs
-- SQL 脚本更新,sys_menu 新增 `tree_path` 字段 (升级需更新SQL脚本)
-
diff --git a/README.md b/README.md
index 87dad56e..45db9948 100644
--- a/README.md
+++ b/README.md
@@ -49,7 +49,7 @@
| [vue3-element-template](https://gitee.com/youlaiorg/vue3-element-template) | Vue 3 + Element Plus | 前端精简模板 |
| [youlai-boot-tenant](https://gitee.com/youlaiorg/youlai-boot-tenant) | Spring Boot 4 | 多租户 SaaS 版 |
| [youlai-boot-flex](https://gitee.com/youlaiorg/youlai-boot-flex) | Spring Boot 3 + MyBatis-Flex | MyBatis-Flex 版 |
-| [youlai-uniapp](https://gitee.com/youlaiorg/youlai-uniapp) | Vue 3 + uni-app | 移动端应用 |
+| [youlai-app](https://gitee.com/youlaiorg/youlai-app) | Vue 3 + uni-app | 移动端应用 |
---
@@ -167,17 +167,4 @@ docker-compose up -d
-**微信交流**:添加 **`haoxianrui`**,备注「前端/后端/全栈」
-
----
-
-如果项目对你有帮助,欢迎 ⭐️ Star 支持!
-
-
uploadFile(
@Parameter(
- name = "file1",
+ name = "file",
description = "表单文件对象",
required = true,
in = ParameterIn.DEFAULT,
- schema = @Schema(name = "file1", format = "binary")
+ schema = @Schema(name = "file", format = "binary")
)
- @RequestPart(value = "file1") MultipartFile file
+ @RequestPart(value = "file") MultipartFile file
) {
FileInfo fileInfo = fileService.uploadFile(file);
return Result.success(fileInfo);
diff --git a/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java b/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java
index e1b79407..f26a86ef 100644
--- a/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java
+++ b/src/main/java/com/youlai/boot/security/filter/TokenAuthenticationFilter.java
@@ -40,15 +40,10 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
- String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
+ String rawToken = resolveToken(request);
try {
- if (StrUtil.isNotBlank(authorizationHeader)
- && authorizationHeader.startsWith(SecurityConstants.BEARER_TOKEN_PREFIX)) {
-
- // 剥离Bearer前缀获取原始令牌
- String rawToken = authorizationHeader.substring(SecurityConstants.BEARER_TOKEN_PREFIX.length());
-
+ if (StrUtil.isNotBlank(rawToken)) {
// 执行令牌有效性检查(包含密码学验签和过期时间验证)
boolean isValidToken = tokenManager.validateToken(rawToken);
if (!isValidToken) {
@@ -70,4 +65,25 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter {
// 继续后续过滤器链执行
filterChain.doFilter(request, response);
}
+
+ /**
+ * 从请求中解析 Token
+ * 优先从 Authorization Header 获取,其次从 URL 参数获取(支持 SSE)
+ */
+ private String resolveToken(HttpServletRequest request) {
+ // 1. 从 Authorization Header 获取
+ String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
+ if (StrUtil.isNotBlank(authorizationHeader)
+ && authorizationHeader.startsWith(SecurityConstants.BEARER_TOKEN_PREFIX)) {
+ return authorizationHeader.substring(SecurityConstants.BEARER_TOKEN_PREFIX.length());
+ }
+
+ // 2. 从 URL 参数获取(支持 SSE EventSource)
+ String tokenParam = request.getParameter("token");
+ if (StrUtil.isNotBlank(tokenParam)) {
+ return tokenParam;
+ }
+
+ return null;
+ }
}
diff --git a/src/main/java/com/youlai/boot/support/mail/service/MailService.java b/src/main/java/com/youlai/boot/support/mail/service/MailService.java
index e0b0b704..0e558ce6 100644
--- a/src/main/java/com/youlai/boot/support/mail/service/MailService.java
+++ b/src/main/java/com/youlai/boot/support/mail/service/MailService.java
@@ -1,13 +1,32 @@
package com.youlai.boot.support.mail.service;
+import com.youlai.boot.config.property.MailProperties;
+import jakarta.mail.MessagingException;
+import jakarta.mail.internet.MimeMessage;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.mail.SimpleMailMessage;
+import org.springframework.mail.javamail.JavaMailSender;
+import org.springframework.mail.javamail.MimeMessageHelper;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+
/**
- * 邮件服务接口层
+ * 邮件服务
*
* @author Ray
* @since 2024/8/17
*/
-public interface MailService {
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class MailService {
+ private final JavaMailSender mailSender;
+
+ private final MailProperties mailProperties;
/**
* 发送简单文本邮件
@@ -16,16 +35,42 @@ public interface MailService {
* @param subject 邮件主题
* @param text 邮件内容
*/
- void sendMail(String to, String subject, String text) ;
+ public void sendMail(String to, String subject, String text) {
+ try {
+ SimpleMailMessage message = new SimpleMailMessage();
+ message.setFrom(mailProperties.getFrom());
+ message.setTo(to);
+ message.setSubject(subject);
+ message.setText(text);
+ mailSender.send(message);
+ } catch (Exception e) {
+ log.error("发送邮件失败{}", e.getMessage());
+ }
+ }
/**
* 发送带附件的邮件
*
- * @param to 收件人地址
- * @param subject 邮件主题
- * @param text 邮件内容
+ * @param to 收件人地址
+ * @param subject 邮件主题
+ * @param text 邮件内容
* @param filePath 附件路径
*/
- void sendMailWithAttachment(String to, String subject, String text, String filePath);
+ public void sendMailWithAttachment(String to, String subject, String text, String filePath) {
+ MimeMessage message = mailSender.createMimeMessage();
+ try {
+ MimeMessageHelper helper = new MimeMessageHelper(message, true);
+ helper.setFrom(mailProperties.getFrom());
+ helper.setTo(to);
+ helper.setSubject(subject);
+ helper.setText(text, true); // true 表示支持HTML内容
+ FileSystemResource file = new FileSystemResource(new File(filePath));
+ helper.addAttachment(file.getFilename(), file);
+
+ mailSender.send(message);
+ } catch (MessagingException e) {
+ log.error("发送带附件的邮件失败{}", e.getMessage());
+ }
+ }
}
diff --git a/src/main/java/com/youlai/boot/support/mail/service/impl/MailServiceImpl.java b/src/main/java/com/youlai/boot/support/mail/service/impl/MailServiceImpl.java
deleted file mode 100644
index fd9bbe49..00000000
--- a/src/main/java/com/youlai/boot/support/mail/service/impl/MailServiceImpl.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.youlai.boot.support.mail.service.impl;
-
-import com.youlai.boot.config.property.MailProperties;
-import com.youlai.boot.support.mail.service.MailService;
-import jakarta.mail.MessagingException;
-import jakarta.mail.internet.MimeMessage;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.core.io.FileSystemResource;
-import org.springframework.mail.SimpleMailMessage;
-import org.springframework.mail.javamail.JavaMailSender;
-import org.springframework.mail.javamail.MimeMessageHelper;
-import org.springframework.stereotype.Service;
-
-import java.io.File;
-
-/**
- * 邮件服务实现类
- *
- * @author Ray
- * @since 2024/8/17
- */
-@Service
-@RequiredArgsConstructor
-@Slf4j
-public class MailServiceImpl implements MailService {
-
- private final JavaMailSender mailSender;
-
- private final MailProperties mailProperties;
-
- /**
- * 发送简单文本邮件
- *
- * @param to 收件人地址
- * @param subject 邮件主题
- * @param text 邮件内容
- */
- @Override
- public void sendMail(String to, String subject, String text) {
- try {
- SimpleMailMessage message = new SimpleMailMessage();
- message.setFrom(mailProperties.getFrom());
- message.setTo(to);
- message.setSubject(subject);
- message.setText(text);
- mailSender.send(message);
- } catch (Exception e) {
- log.error("发送邮件失败{}", e.getMessage());
- }
- }
-
- /**
- * 发送带附件的邮件
- *
- * @param to 收件人地址
- * @param subject 邮件主题
- * @param text 邮件内容
- * @param filePath 附件路径
- */
- @Override
- public void sendMailWithAttachment(String to, String subject, String text, String filePath) {
- MimeMessage message = mailSender.createMimeMessage();
- try {
- MimeMessageHelper helper = new MimeMessageHelper(message, true);
- helper.setFrom(mailProperties.getFrom());
- helper.setTo(to);
- helper.setSubject(subject);
- helper.setText(text, true); // true 表示支持HTML内容
-
- FileSystemResource file = new FileSystemResource(new File(filePath));
- helper.addAttachment(file.getFilename(), file);
-
- mailSender.send(message);
- } catch (MessagingException e) {
- log.error("发送带附件的邮件失败{}", e.getMessage());
- }
- }
-}
diff --git a/src/main/java/com/youlai/boot/support/sse/OnlineUserCountJob.java b/src/main/java/com/youlai/boot/support/sse/OnlineUserCountJob.java
new file mode 100644
index 00000000..b30e015d
--- /dev/null
+++ b/src/main/java/com/youlai/boot/support/sse/OnlineUserCountJob.java
@@ -0,0 +1,36 @@
+package com.youlai.boot.support.sse;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+/**
+ * 在线用户数统计定时任务
+ *
+ * 定时统计并广播当前在线用户数量到所有 SSE 客户端
+ */
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class OnlineUserCountJob {
+
+ private final SseSessionRegistry sessionRegistry;
+ private final SseService sseService;
+
+ /**
+ * 定时统计在线用户数并广播
+ *
+ * 每3分钟执行一次,推送当前在线用户数量
+ */
+ @Scheduled(cron = "0 */3 * * * ?")
+ public void execute() {
+ int onlineCount = sessionRegistry.getOnlineUserCount();
+ int connectionCount = sessionRegistry.getTotalConnectionCount();
+
+ log.debug("定时统计:在线用户数={}, 总连接数={}", onlineCount, connectionCount);
+
+ // 发送在线用户数量
+ sseService.sendOnlineCount();
+ }
+}
diff --git a/src/main/java/com/youlai/boot/support/sse/SseController.java b/src/main/java/com/youlai/boot/support/sse/SseController.java
new file mode 100644
index 00000000..ad07be4f
--- /dev/null
+++ b/src/main/java/com/youlai/boot/support/sse/SseController.java
@@ -0,0 +1,44 @@
+package com.youlai.boot.support.sse;
+
+import com.youlai.boot.core.web.Result;
+import com.youlai.boot.security.model.SysUserDetails;
+import com.youlai.boot.security.util.SecurityUtils;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+/**
+ * SSE 控制器
+ */
+@Tag(name = "14. SSE连接")
+@Slf4j
+@RestController
+@RequestMapping("/api/v1/sse")
+@RequiredArgsConstructor
+public class SseController {
+
+ private final SseService sseService;
+
+ @Operation(summary = "建立SSE连接")
+ @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public SseEmitter connect() {
+ SysUserDetails user = SecurityUtils.getUser().orElse(null);
+ if (user == null) {
+ log.warn("SSE连接失败:未获取到当前用户");
+ return null;
+ }
+ return sseService.createConnection(user.getUsername());
+ }
+
+ @Operation(summary = "获取在线用户数")
+ @GetMapping("/online-count")
+ public Result getOnlineCount() {
+ return Result.success(sseService.getOnlineUserCount());
+ }
+}
diff --git a/src/main/java/com/youlai/boot/support/sse/SseService.java b/src/main/java/com/youlai/boot/support/sse/SseService.java
new file mode 100644
index 00000000..f9424ab8
--- /dev/null
+++ b/src/main/java/com/youlai/boot/support/sse/SseService.java
@@ -0,0 +1,138 @@
+package com.youlai.boot.support.sse;
+
+import com.youlai.boot.support.sse.dto.DictChangeEvent;
+import com.youlai.boot.support.sse.dto.OnlineUserDTO;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SSE 服务
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class SseService {
+
+ /** SSE 连接超时时间:30 分钟 */
+ private static final long TIMEOUT = 30 * 60 * 1000L;
+
+ private final SseSessionRegistry sessionRegistry;
+
+ /**
+ * 创建 SSE 连接
+ *
+ * @param username 用户名
+ * @return SseEmitter
+ */
+ public SseEmitter createConnection(String username) {
+ if (username == null || username.isEmpty()) {
+ log.warn("创建SSE连接失败:用户名为空");
+ return null;
+ }
+
+ // 创建 SseEmitter,设置超时时间
+ SseEmitter emitter = new SseEmitter(TIMEOUT);
+
+ // 注册用户连接
+ sessionRegistry.userConnected(username, emitter);
+
+ // 连接建立后立即发送在线用户数
+ try {
+ emitter.send(SseEmitter.event()
+ .name(SseTopics.ONLINE_COUNT)
+ .data(sessionRegistry.getOnlineUserCount()));
+ } catch (IOException e) {
+ log.warn("发送初始在线用户数失败: {}", e.getMessage());
+ }
+
+ log.info("用户[{}]SSE连接已建立,当前在线用户数: {}", username, sessionRegistry.getOnlineUserCount());
+
+ // 发送在线用户数变更
+ sendOnlineCount();
+
+ return emitter;
+ }
+
+ /**
+ * 发送字典变更事件
+ *
+ * @param dictCode 字典编码
+ */
+ public void sendDictChange(String dictCode) {
+ if (dictCode == null || dictCode.isEmpty()) {
+ log.warn("字典编码为空,跳过发送");
+ return;
+ }
+
+ DictChangeEvent event = new DictChangeEvent(dictCode);
+ sessionRegistry.broadcast(SseTopics.DICT, event);
+ log.info("已发送字典变更通知: dictCode={}", dictCode);
+ }
+
+ /**
+ * 发送在线用户数
+ */
+ public void sendOnlineCount() {
+ int count = sessionRegistry.getOnlineUserCount();
+ sessionRegistry.broadcast(SseTopics.ONLINE_COUNT, count);
+ log.debug("已发送在线用户数: {}", count);
+ }
+
+ /**
+ * 发送消息给指定用户
+ *
+ * @param username 用户名
+ * @param eventName 事件名称
+ * @param data 数据
+ */
+ public void sendToUser(String username, String eventName, Object data) {
+ if (username == null || username.isEmpty()) {
+ log.warn("用户名为空,无法发送消息");
+ return;
+ }
+ sessionRegistry.sendToUser(username, eventName, data);
+ log.info("已向用户[{}]发送事件[{}]", username, eventName);
+ }
+
+ /**
+ * 获取在线用户列表
+ *
+ * @return 在线用户列表
+ */
+ public List getOnlineUsers() {
+ return sessionRegistry.getOnlineUsers();
+ }
+
+ /**
+ * 获取在线用户数
+ *
+ * @return 在线用户数
+ */
+ public int getOnlineUserCount() {
+ return sessionRegistry.getOnlineUserCount();
+ }
+
+ /**
+ * 发送系统消息
+ *
+ * @param message 消息内容
+ */
+ public void sendSystemMessage(String message) {
+ if (message == null || message.isEmpty()) {
+ return;
+ }
+ Map systemMessage = Map.of(
+ "sender", "系统通知",
+ "content", message,
+ "timestamp", System.currentTimeMillis()
+ );
+ sessionRegistry.broadcast(SseTopics.SYSTEM, systemMessage);
+ log.info("已发送系统消息: {}", message);
+ }
+}
diff --git a/src/main/java/com/youlai/boot/support/sse/SseSessionRegistry.java b/src/main/java/com/youlai/boot/support/sse/SseSessionRegistry.java
new file mode 100644
index 00000000..95aedf1e
--- /dev/null
+++ b/src/main/java/com/youlai/boot/support/sse/SseSessionRegistry.java
@@ -0,0 +1,196 @@
+package com.youlai.boot.support.sse;
+
+import com.youlai.boot.support.sse.dto.OnlineUserDTO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * SSE 会话注册表
+ *
+ * 维护 SSE 连接的用户会话信息,支持多设备同时登录
+ */
+@Slf4j
+@Component
+public class SseSessionRegistry {
+
+ /** 用户名 -> SseEmitter 集合(支持多设备) */
+ private final Map> userEmittersMap = new ConcurrentHashMap<>();
+
+ /** SseEmitter -> 用户名(快速定位用户) */
+ private final Map emitterUserMap = new ConcurrentHashMap<>();
+
+ /** SseEmitter -> 连接时间 */
+ private final Map emitterTimeMap = new ConcurrentHashMap<>();
+
+ /**
+ * 用户上线(建立 SSE 连接)
+ *
+ * @param username 用户名
+ * @param emitter SseEmitter
+ */
+ public void userConnected(String username, SseEmitter emitter) {
+ userEmittersMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet()).add(emitter);
+ emitterUserMap.put(emitter, username);
+ emitterTimeMap.put(emitter, System.currentTimeMillis());
+ log.debug("用户[{}]SSE连接已建立", username);
+
+ // 设置连接超时和完成回调
+ emitter.onCompletion(() -> {
+ removeEmitter(emitter);
+ log.debug("用户[{}]SSE连接已完成", username);
+ });
+ emitter.onTimeout(() -> {
+ removeEmitter(emitter);
+ log.debug("用户[{}]SSE连接超时", username);
+ });
+ emitter.onError(e -> {
+ removeEmitter(emitter);
+ log.debug("用户[{}]SSE连接错误: {}", username, e.getMessage());
+ });
+ }
+
+ /**
+ * 移除指定 emitter
+ */
+ private void removeEmitter(SseEmitter emitter) {
+ String username = emitterUserMap.remove(emitter);
+ if (username == null) {
+ return;
+ }
+ emitterTimeMap.remove(emitter);
+
+ Set emitters = userEmittersMap.get(username);
+ if (emitters != null) {
+ emitters.remove(emitter);
+ if (emitters.isEmpty()) {
+ userEmittersMap.remove(username);
+ log.debug("用户[{}]所有SSE连接已断开", username);
+ }
+ }
+ }
+
+ /**
+ * 用户下线(断开所有 SSE 连接)
+ *
+ * @param username 用户名
+ */
+ public void userDisconnected(String username) {
+ Set emitters = userEmittersMap.remove(username);
+ if (emitters == null) {
+ return;
+ }
+ emitters.forEach(emitter -> {
+ emitterUserMap.remove(emitter);
+ emitterTimeMap.remove(emitter);
+ try {
+ emitter.complete();
+ } catch (Exception ignored) {
+ }
+ });
+ log.debug("用户[{}]已下线,移除{}个SSE连接", username, emitters.size());
+ }
+
+ /**
+ * 获取在线用户数量
+ */
+ public int getOnlineUserCount() {
+ return userEmittersMap.size();
+ }
+
+ /**
+ * 获取总连接数量(包括多设备)
+ */
+ public int getTotalConnectionCount() {
+ return emitterUserMap.size();
+ }
+
+ /**
+ * 获取指定用户的连接数量
+ */
+ public int getUserConnectionCount(String username) {
+ Set emitters = userEmittersMap.get(username);
+ return emitters != null ? emitters.size() : 0;
+ }
+
+ /**
+ * 检查用户是否在线
+ */
+ public boolean isUserOnline(String username) {
+ Set emitters = userEmittersMap.get(username);
+ return emitters != null && !emitters.isEmpty();
+ }
+
+ /**
+ * 获取所有在线用户列表
+ */
+ public List getOnlineUsers() {
+ return userEmittersMap.entrySet().stream()
+ .map(entry -> {
+ String username = entry.getKey();
+ Set emitters = entry.getValue();
+ long earliestTime = emitters.stream()
+ .map(emitterTimeMap::get)
+ .filter(t -> t != null)
+ .mapToLong(Long::longValue)
+ .min()
+ .orElse(System.currentTimeMillis());
+ return new OnlineUserDTO(username, emitters.size(), earliestTime);
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * 获取所有活跃的 SseEmitter
+ */
+ public Set getAllEmitters() {
+ return emitterUserMap.keySet();
+ }
+
+ /**
+ * 获取指定用户的所有 SseEmitter
+ */
+ public Set getUserEmitters(String username) {
+ return userEmittersMap.get(username);
+ }
+
+ /**
+ * 向指定 emitter 发送事件
+ */
+ public boolean sendEvent(SseEmitter emitter, String eventName, Object data) {
+ try {
+ emitter.send(SseEmitter.event()
+ .name(eventName)
+ .data(data));
+ return true;
+ } catch (IOException e) {
+ log.warn("发送SSE事件失败: {}", e.getMessage());
+ removeEmitter(emitter);
+ return false;
+ }
+ }
+
+ /**
+ * 向所有连接广播事件
+ */
+ public void broadcast(String eventName, Object data) {
+ getAllEmitters().forEach(emitter -> sendEvent(emitter, eventName, data));
+ }
+
+ /**
+ * 向指定用户发送事件
+ */
+ public void sendToUser(String username, String eventName, Object data) {
+ Set emitters = userEmittersMap.get(username);
+ if (emitters != null) {
+ emitters.forEach(emitter -> sendEvent(emitter, eventName, data));
+ }
+ }
+}
diff --git a/src/main/java/com/youlai/boot/support/sse/SseTopics.java b/src/main/java/com/youlai/boot/support/sse/SseTopics.java
new file mode 100644
index 00000000..c3dcdf6d
--- /dev/null
+++ b/src/main/java/com/youlai/boot/support/sse/SseTopics.java
@@ -0,0 +1,19 @@
+package com.youlai.boot.support.sse;
+
+/**
+ * SSE 主题常量
+ */
+public final class SseTopics {
+
+ private SseTopics() {
+ }
+
+ /** 字典变更事件 */
+ public static final String DICT = "dict";
+
+ /** 在线用户数事件 */
+ public static final String ONLINE_COUNT = "online-count";
+
+ /** 系统消息事件 */
+ public static final String SYSTEM = "system";
+}
diff --git a/src/main/java/com/youlai/boot/support/websocket/dto/DictChangeEvent.java b/src/main/java/com/youlai/boot/support/sse/dto/DictChangeEvent.java
similarity index 85%
rename from src/main/java/com/youlai/boot/support/websocket/dto/DictChangeEvent.java
rename to src/main/java/com/youlai/boot/support/sse/dto/DictChangeEvent.java
index a07843d6..6064e0b2 100644
--- a/src/main/java/com/youlai/boot/support/websocket/dto/DictChangeEvent.java
+++ b/src/main/java/com/youlai/boot/support/sse/dto/DictChangeEvent.java
@@ -1,4 +1,4 @@
-package com.youlai.boot.support.websocket.dto;
+package com.youlai.boot.support.sse.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -10,7 +10,7 @@ import java.io.Serializable;
/**
* 字典变更事件
*
- * 当字典数据发生变更时,通过 WebSocket 广播此事件通知前端清除缓存。
+ * 当字典数据发生变更时,通过 SSE 广播此事件通知前端清除缓存。
* 前端收到通知后清除对应字典的本地缓存,下次使用时重新从服务端加载。
*
* @author Ray.Hao
diff --git a/src/main/java/com/youlai/boot/support/websocket/dto/OnlineUserDTO.java b/src/main/java/com/youlai/boot/support/sse/dto/OnlineUserDTO.java
similarity index 93%
rename from src/main/java/com/youlai/boot/support/websocket/dto/OnlineUserDTO.java
rename to src/main/java/com/youlai/boot/support/sse/dto/OnlineUserDTO.java
index 820ad42a..26f5d129 100644
--- a/src/main/java/com/youlai/boot/support/websocket/dto/OnlineUserDTO.java
+++ b/src/main/java/com/youlai/boot/support/sse/dto/OnlineUserDTO.java
@@ -1,4 +1,4 @@
-package com.youlai.boot.support.websocket.dto;
+package com.youlai.boot.support.sse.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git a/src/main/java/com/youlai/boot/support/websocket/job/OnlineUserCountJob.java b/src/main/java/com/youlai/boot/support/websocket/job/OnlineUserCountJob.java
deleted file mode 100644
index 8e59bcf3..00000000
--- a/src/main/java/com/youlai/boot/support/websocket/job/OnlineUserCountJob.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.youlai.boot.support.websocket.job;
-
-import com.youlai.boot.support.websocket.publisher.WebSocketPublisher;
-import com.youlai.boot.support.websocket.session.UserSessionRegistry;
-import com.youlai.boot.support.websocket.topic.WebSocketTopics;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-/**
- * 在线用户数统计定时任务
- *
- * 定时统计并广播当前在线用户数量到所有WebSocket客户端。
- * 用于解决以下问题:
- *
- * - 客户端页面刷新后可快速同步最新在线人数
- * - 减少服务端主动推送频率,降低资源消耗
- *
- *
- * @author Ray.Hao
- * @since 3.0.0
- */
-@Component
-@Slf4j
-@RequiredArgsConstructor
-public class OnlineUserCountJob {
-
- private final UserSessionRegistry userSessionRegistry;
- private final WebSocketPublisher webSocketPublisher;
-
- /**
- * 定时统计在线用户数并广播
- *
- * 每3分钟执行一次,推送当前在线用户数量
- */
- @Scheduled(cron = "0 */3 * * * ?")
- public void execute() {
- int onlineCount = userSessionRegistry.getOnlineUserCount();
- int sessionCount = userSessionRegistry.getTotalSessionCount();
-
- log.debug("定时统计:在线用户数={}, 总会话数={}", onlineCount, sessionCount);
-
- // 广播在线用户数量
- webSocketPublisher.publish(WebSocketTopics.TOPIC_ONLINE_COUNT, onlineCount);
- }
-}
diff --git a/src/main/java/com/youlai/boot/support/websocket/publisher/WebSocketPublisher.java b/src/main/java/com/youlai/boot/support/websocket/publisher/WebSocketPublisher.java
deleted file mode 100644
index e01f6241..00000000
--- a/src/main/java/com/youlai/boot/support/websocket/publisher/WebSocketPublisher.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.youlai.boot.support.websocket.publisher;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.messaging.simp.SimpMessagingTemplate;
-import org.springframework.stereotype.Service;
-import tools.jackson.core.JacksonException;
-import tools.jackson.databind.ObjectMapper;
-
-@Service
-@RequiredArgsConstructor
-@Slf4j
-public class WebSocketPublisher {
-
- private SimpMessagingTemplate messagingTemplate;
- private final ObjectMapper objectMapper;
-
- @Autowired(required = false)
- public void setMessagingTemplate(SimpMessagingTemplate messagingTemplate) {
- this.messagingTemplate = messagingTemplate;
- }
-
- public void publish(String destination, Object payload) {
- if (messagingTemplate == null) {
- log.warn("消息模板尚未初始化,无法发送消息: destination={}", destination);
- return;
- }
-
- try {
- Object body = serializeIfNeeded(payload);
- messagingTemplate.convertAndSend(destination, body);
- } catch (Exception e) {
- log.error("发送消息失败: destination={}", destination, e);
- }
- }
-
- public void publishToUser(String username, String destination, Object payload) {
- if (messagingTemplate == null) {
- log.warn("消息模板尚未初始化,无法发送用户消息: username={}, destination={}", username, destination);
- return;
- }
-
- try {
- Object body = serializeIfNeeded(payload);
- messagingTemplate.convertAndSendToUser(username, destination, body);
- } catch (Exception e) {
- log.error("发送用户消息失败: username={}, destination={}", username, destination, e);
- }
- }
-
- private Object serializeIfNeeded(Object payload) throws JacksonException {
- if (payload == null) {
- return null;
- }
- if (payload instanceof String || payload instanceof Number || payload instanceof Boolean) {
- return payload;
- }
- return objectMapper.writeValueAsString(payload);
- }
-}
diff --git a/src/main/java/com/youlai/boot/support/websocket/service/WebSocketService.java b/src/main/java/com/youlai/boot/support/websocket/service/WebSocketService.java
deleted file mode 100644
index 99fd050f..00000000
--- a/src/main/java/com/youlai/boot/support/websocket/service/WebSocketService.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.youlai.boot.support.websocket.service;
-
-import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
-
-import java.util.List;
-
-/**
- * WebSocket服务接口
- *
- * 提供与WebSocket连接管理相关的功能,包括:
- * - 用户连接/断开事件处理
- * - 字典数据变更通知
- * - 系统消息推送
- *
- *
- * @author Ray.Hao
- * @since 3.0.0
- */
-public interface WebSocketService {
-
- /**
- * 处理用户连接事件
- *
- * @param username 用户名
- * @param sessionId WebSocket会话ID
- */
- void userConnected(String username, String sessionId);
-
- /**
- * 处理用户断开连接事件
- *
- * @param username 用户名
- */
- void userDisconnected(String username);
-
- /**
- * 广播字典数据变更通知
- *
- * @param dictCode 字典编码
- */
- void broadcastDictChange(String dictCode);
-
- /**
- * 发送系统通知给特定用户
- *
- * @param username 目标用户名
- * @param message 通知消息内容
- */
- void sendNotification(String username, Object message);
-
- /**
- * 获取在线用户列表
- *
- * @return 在线用户信息列表
- */
- List getOnlineUsers();
-}
diff --git a/src/main/java/com/youlai/boot/support/websocket/service/impl/WebSocketServiceImpl.java b/src/main/java/com/youlai/boot/support/websocket/service/impl/WebSocketServiceImpl.java
deleted file mode 100644
index 708ec9c0..00000000
--- a/src/main/java/com/youlai/boot/support/websocket/service/impl/WebSocketServiceImpl.java
+++ /dev/null
@@ -1,226 +0,0 @@
-package com.youlai.boot.support.websocket.service.impl;
-
-import com.youlai.boot.support.websocket.dto.DictChangeEvent;
-import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
-import com.youlai.boot.support.websocket.publisher.WebSocketPublisher;
-import com.youlai.boot.support.websocket.session.UserSessionRegistry;
-import com.youlai.boot.support.websocket.service.WebSocketService;
-import com.youlai.boot.support.websocket.topic.WebSocketTopics;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * WebSocket 服务实现类
- *
- * 核心功能:
- * - 用户在线状态管理(支持多设备登录)
- * - 消息推送(广播、点对点)
- * - 字典变更通知
- *
- * @author Ray.Hao
- * @since 3.0.0
- */
-@Service
-@Slf4j
-public class WebSocketServiceImpl implements WebSocketService {
-
- private final UserSessionRegistry userSessionRegistry;
- private final WebSocketPublisher webSocketPublisher;
-
- public WebSocketServiceImpl(UserSessionRegistry userSessionRegistry, WebSocketPublisher webSocketPublisher) {
- this.userSessionRegistry = userSessionRegistry;
- this.webSocketPublisher = webSocketPublisher;
- }
-
- // ==================== 用户在线状态管理 ====================
-
- /**
- * 处理用户连接事件
- *
- * @param username 用户名
- * @param sessionId WebSocket 会话 ID
- */
- @Override
- public void userConnected(String username, String sessionId) {
- if (username == null || username.isEmpty()) {
- log.warn("用户连接失败:用户名为空");
- return;
- }
-
- if (sessionId == null || sessionId.isEmpty()) {
- log.warn("用户[{}]连接失败:会话 ID 为空", username);
- return;
- }
-
- userSessionRegistry.userConnected(username, sessionId);
-
- int sessionCount = userSessionRegistry.getUserSessionCount(username);
- int totalOnlineUsers = userSessionRegistry.getOnlineUserCount();
-
- log.info("✓ 用户[{}]会话[{}]上线(该用户共 {} 个会话,系统总在线用户数:{})",
- username, sessionId, sessionCount, totalOnlineUsers);
-
- // 广播在线用户数变更
- broadcastOnlineUserCount();
- }
-
- /**
- * 处理用户断开连接事件
- *
- * @param username 用户名
- */
- @Override
- public void userDisconnected(String username) {
- if (username == null || username.isEmpty()) {
- return;
- }
-
- userSessionRegistry.userDisconnected(username);
-
- int totalOnlineUsers = userSessionRegistry.getOnlineUserCount();
- log.info("✓ 用户[{}]下线(系统总在线用户数:{})", username, totalOnlineUsers);
-
- // 广播在线用户数变更
- broadcastOnlineUserCount();
- }
-
- /**
- * 移除指定会话(单个设备下线)
- *
- * @param sessionId 会话 ID
- */
- public void removeSession(String sessionId) {
- userSessionRegistry.removeSession(sessionId);
- broadcastOnlineUserCount();
- }
-
- /**
- * 获取在线用户列表
- *
- * @return 在线用户信息列表
- */
- public List getOnlineUsers() {
- return userSessionRegistry.getOnlineUsers();
- }
-
- /**
- * 获取在线用户数量
- *
- * @return 在线用户数(不是会话数)
- */
- public int getOnlineUserCount() {
- return userSessionRegistry.getOnlineUserCount();
- }
-
- /**
- * 获取在线会话总数
- *
- * @return 所有在线会话的总数
- */
- public int getTotalSessionCount() {
- return userSessionRegistry.getTotalSessionCount();
- }
-
- /**
- * 检查用户是否在线
- *
- * @param username 用户名
- * @return 是否在线
- */
- public boolean isUserOnline(String username) {
- return userSessionRegistry.isUserOnline(username);
- }
-
- /**
- * 获取指定用户的会话数量
- *
- * @param username 用户名
- * @return 会话数量
- */
- public int getUserSessionCount(String username) {
- return userSessionRegistry.getUserSessionCount(username);
- }
-
- /**
- * 手动触发在线用户数量广播
- *
- * 供外部服务(如定时任务)调用
- */
- public void notifyOnlineUsersChange() {
- log.info("手动触发在线用户数量通知,当前在线用户数:{}", getOnlineUserCount());
- broadcastOnlineUserCount();
- }
-
- /**
- * 广播在线用户数量变更(内部方法)
- */
- private void broadcastOnlineUserCount() {
- int count = getOnlineUserCount();
- webSocketPublisher.publish(WebSocketTopics.TOPIC_ONLINE_COUNT, count);
- log.debug("✓ 已广播在线用户数量: {}", count);
- }
-
- // ==================== 消息推送功能 ====================
-
- /**
- * 向所有客户端广播字典更新事件
- *
- * @param dictCode 字典编码
- */
- @Override
- public void broadcastDictChange(String dictCode) {
- if (dictCode == null || dictCode.isEmpty()) {
- log.warn("字典编码为空,跳过广播");
- return;
- }
-
- DictChangeEvent event = new DictChangeEvent(dictCode);
- webSocketPublisher.publish(WebSocketTopics.TOPIC_DICT, event);
- log.info("✓ 已广播字典变更通知: dictCode={}", dictCode);
- }
-
- /**
- * 向特定用户发送通知消息
- *
- * @param username 目标用户名
- * @param message 消息内容
- */
- @Override
- public void sendNotification(String username, Object message) {
- if (username == null || username.isEmpty()) {
- log.warn("用户名为空,无法发送通知");
- return;
- }
-
- if (message == null) {
- log.warn("消息内容为空,无法发送给用户[{}]", username);
- return;
- }
-
- webSocketPublisher.publishToUser(username, WebSocketTopics.USER_QUEUE_MESSAGES, message);
- log.info("✓ 已向用户[{}]发送通知", username);
- }
-
- /**
- * 广播系统消息给所有用户
- *
- * @param message 消息内容
- */
- public void broadcastSystemMessage(String message) {
- if (message == null || message.isEmpty()) {
- log.warn("消息内容为空,无法广播");
- return;
- }
-
- Map systemMessage = Map.of(
- "sender", "系统通知",
- "content", message,
- "timestamp", System.currentTimeMillis()
- );
- webSocketPublisher.publish(WebSocketTopics.TOPIC_PUBLIC, systemMessage);
- log.info("✓ 已广播系统消息: {}", message);
- }
-}
diff --git a/src/main/java/com/youlai/boot/support/websocket/session/UserSessionRegistry.java b/src/main/java/com/youlai/boot/support/websocket/session/UserSessionRegistry.java
deleted file mode 100644
index 12090211..00000000
--- a/src/main/java/com/youlai/boot/support/websocket/session/UserSessionRegistry.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package com.youlai.boot.support.websocket.session;
-
-import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-/**
- * WebSocket 用户会话注册表
- *
- * 维护WebSocket连接的用户会话信息,支持多设备同时登录。
- * 采用双Map结构实现高效查询:
- *
- * - userSessionsMap: 用户名 -> 会话ID集合(支持多设备)
- * - sessionDetailsMap: 会话ID -> 会话详情(快速定位用户)
- *
- *
- * @author Ray.Hao
- * @since 3.0.0
- */
-@Slf4j
-@Component
-public class UserSessionRegistry {
-
- /**
- * 用户会话映射表
- *
- * Key: 用户名
- * Value: 该用户所有WebSocket会话ID集合(支持多设备登录)
- */
- private final Map> userSessionsMap = new ConcurrentHashMap<>();
-
- /**
- * 会话详情映射表
- *
- * Key: WebSocket会话ID
- * Value: 会话详情(包含用户名、连接时间等)
- */
- private final Map sessionDetailsMap = new ConcurrentHashMap<>();
-
- /**
- * 用户上线(建立WebSocket连接)
- *
- * @param username 用户名
- * @param sessionId WebSocket会话ID
- */
- public void userConnected(String username, String sessionId) {
- userSessionsMap.computeIfAbsent(username, k -> ConcurrentHashMap.newKeySet()).add(sessionId);
- sessionDetailsMap.put(sessionId, new SessionInfo(username, sessionId, System.currentTimeMillis()));
- log.debug("用户[{}]会话[{}]已注册", username, sessionId);
- }
-
- /**
- * 用户下线(断开所有WebSocket连接)
- *
- * 移除该用户的所有会话信息
- *
- * @param username 用户名
- */
- public void userDisconnected(String username) {
- Set sessions = userSessionsMap.remove(username);
- if (sessions == null) {
- return;
- }
- sessions.forEach(sessionDetailsMap::remove);
- log.debug("用户[{}]已下线,移除{}个会话", username, sessions.size());
- }
-
- /**
- * 移除指定会话(单设备下线)
- *
- * 当用户某一设备断开连接时调用,保留其他设备的会话
- *
- * @param sessionId WebSocket会话ID
- */
- public void removeSession(String sessionId) {
- SessionInfo sessionInfo = sessionDetailsMap.remove(sessionId);
- if (sessionInfo == null) {
- return;
- }
-
- String username = sessionInfo.getUsername();
- Set sessions = userSessionsMap.get(username);
- if (sessions == null) {
- return;
- }
-
- sessions.remove(sessionId);
- if (sessions.isEmpty()) {
- // 该用户没有任何会话了,移除用户记录
- userSessionsMap.remove(username);
- log.debug("用户[{}]最后一个会话已移除", username);
- }
- }
-
- /**
- * 获取在线用户数量
- *
- * @return 当前在线用户数(非会话数)
- */
- public int getOnlineUserCount() {
- return userSessionsMap.size();
- }
-
- /**
- * 获取指定用户的会话数量
- *
- * @param username 用户名
- * @return 该用户的WebSocket会话数量(多设备登录时大于1)
- */
- public int getUserSessionCount(String username) {
- Set sessions = userSessionsMap.get(username);
- return sessions != null ? sessions.size() : 0;
- }
-
- /**
- * 获取在线会话总数
- *
- * @return 所有WebSocket会话的总数(包含多设备)
- */
- public int getTotalSessionCount() {
- return sessionDetailsMap.size();
- }
-
- /**
- * 检查用户是否在线
- *
- * @param username 用户名
- * @return 是否在线(至少有一个活跃会话)
- */
- public boolean isUserOnline(String username) {
- Set sessions = userSessionsMap.get(username);
- return sessions != null && !sessions.isEmpty();
- }
-
- /**
- * 获取所有在线用户列表
- *
- * @return 在线用户信息列表
- */
- public List getOnlineUsers() {
- return userSessionsMap.entrySet().stream()
- .map(entry -> {
- String username = entry.getKey();
- Set sessions = entry.getValue();
- // 取最早的连接时间作为登录时间
- long earliestLoginTime = sessions.stream()
- .map(sessionDetailsMap::get)
- .filter(info -> info != null)
- .mapToLong(SessionInfo::getConnectTime)
- .min()
- .orElse(System.currentTimeMillis());
-
- return new OnlineUserDTO(username, sessions.size(), earliestLoginTime);
- })
- .collect(Collectors.toList());
- }
-
- /**
- * WebSocket 会话详情(内部使用)
- */
- @Data
- @AllArgsConstructor
- private static class SessionInfo {
- /** 用户名 */
- private String username;
- /** WebSocket会话ID */
- private String sessionId;
- /** 连接时间戳 */
- private long connectTime;
- }
-}
diff --git a/src/main/java/com/youlai/boot/support/websocket/topic/WebSocketTopics.java b/src/main/java/com/youlai/boot/support/websocket/topic/WebSocketTopics.java
deleted file mode 100644
index 62a7741c..00000000
--- a/src/main/java/com/youlai/boot/support/websocket/topic/WebSocketTopics.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.youlai.boot.support.websocket.topic;
-
-public final class WebSocketTopics {
-
- private WebSocketTopics() {
- }
-
- public static final String TOPIC_DICT = "/topic/dict";
- public static final String TOPIC_ONLINE_COUNT = "/topic/online-count";
- public static final String TOPIC_PUBLIC = "/topic/public";
-
- public static final String USER_QUEUE_MESSAGES = "/queue/messages";
- public static final String USER_QUEUE_MESSAGE = "/queue/message";
-}
diff --git a/src/main/java/com/youlai/boot/system/controller/DictController.java b/src/main/java/com/youlai/boot/system/controller/DictController.java
index b88b662c..700899f3 100644
--- a/src/main/java/com/youlai/boot/system/controller/DictController.java
+++ b/src/main/java/com/youlai/boot/system/controller/DictController.java
@@ -16,7 +16,7 @@ import com.youlai.boot.system.model.form.DictForm;
import com.youlai.boot.common.annotation.Log;
import com.youlai.boot.system.service.DictItemService;
import com.youlai.boot.system.service.DictService;
-import com.youlai.boot.support.websocket.service.WebSocketService;
+import com.youlai.boot.support.sse.SseService;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.Operation;
@@ -42,7 +42,7 @@ public class DictController {
private final DictService dictService;
private final DictItemService dictItemService;
- private final WebSocketService webSocketService;
+ private final SseService sseService;
//---------------------------------------------------
// 字典相关接口
@@ -82,7 +82,7 @@ public class DictController {
boolean result = dictService.saveDict(formData);
// 发送字典更新通知
if (result) {
- webSocketService.broadcastDictChange(formData.getDictCode());
+ sseService.sendDictChange(formData.getDictCode());
}
return Result.judge(result);
}
@@ -97,7 +97,7 @@ public class DictController {
boolean status = dictService.updateDict(id, dictForm);
// 发送字典更新通知
if (status && dictForm.getDictCode() != null) {
- webSocketService.broadcastDictChange(dictForm.getDictCode());
+ sseService.sendDictChange(dictForm.getDictCode());
}
return Result.judge(status);
}
@@ -110,14 +110,14 @@ public class DictController {
) {
// 获取字典编码列表,用于发送删除通知
List dictCodes = dictService.getDictCodesByIds(Arrays.stream(ids.split(",")).toList());
-
+
dictService.deleteDictByIds(Arrays.stream(ids.split(",")).toList());
-
+
// 发送字典删除通知
for (String dictCode : dictCodes) {
- webSocketService.broadcastDictChange(dictCode);
+ sseService.sendDictChange(dictCode);
}
-
+
return Result.success();
}
@@ -155,12 +155,12 @@ public class DictController {
) {
formData.setDictCode(dictCode);
boolean result = dictItemService.saveDictItem(formData);
-
+
// 发送字典更新通知
if (result) {
- webSocketService.broadcastDictChange(dictCode);
+ sseService.sendDictChange(dictCode);
}
-
+
return Result.judge(result);
}
@@ -186,12 +186,12 @@ public class DictController {
formData.setId(itemId);
formData.setDictCode(dictCode);
boolean status = dictItemService.updateDictItem(formData);
-
+
// 发送字典更新通知
if (status) {
- webSocketService.broadcastDictChange(dictCode);
+ sseService.sendDictChange(dictCode);
}
-
+
return Result.judge(status);
}
@@ -203,10 +203,10 @@ public class DictController {
@Parameter(description = "字典ID,多个以英文逗号(,)拼接") @PathVariable String itemIds
) {
dictItemService.deleteDictItemByIds(itemIds);
-
+
// 发送字典更新通知
- webSocketService.broadcastDictChange(dictCode);
-
+ sseService.sendDictChange(dictCode);
+
return Result.success();
}
diff --git a/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java b/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java
index e9f0d75d..00bdf326 100644
--- a/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java
+++ b/src/main/java/com/youlai/boot/system/service/impl/NoticeServiceImpl.java
@@ -8,7 +8,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.youlai.boot.core.exception.BusinessException;
-import com.youlai.boot.support.websocket.dto.OnlineUserDTO;
+import com.youlai.boot.support.sse.dto.OnlineUserDTO;
import com.youlai.boot.security.util.SecurityUtils;
import com.youlai.boot.system.converter.NoticeConverter;
import com.youlai.boot.system.enums.NoticePublishStatusEnum;
@@ -27,7 +27,7 @@ import com.youlai.boot.system.model.vo.NoticeDetailVO;
import com.youlai.boot.system.service.NoticeService;
import com.youlai.boot.system.service.UserNoticeService;
import com.youlai.boot.system.service.UserService;
-import com.youlai.boot.support.websocket.service.WebSocketService;
+import com.youlai.boot.support.sse.SseService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -52,7 +52,7 @@ public class NoticeServiceImpl extends ServiceImpl impleme
private final NoticeConverter noticeConverter;
private final UserNoticeService userNoticeService;
private final UserService userService;
- private final WebSocketService webSocketService;
+ private final SseService sseService;
/**
* 获取通知公告分页列表
@@ -213,7 +213,7 @@ public class NoticeServiceImpl extends ServiceImpl impleme
Set receivers = targetUserList.stream().map(User::getUsername).collect(Collectors.toSet());
// 获取在线用户名集合
- Set allOnlineUsers = webSocketService.getOnlineUsers().stream()
+ Set allOnlineUsers = sseService.getOnlineUsers().stream()
.map(OnlineUserDTO::getUsername)
.collect(Collectors.toSet());
@@ -227,7 +227,7 @@ public class NoticeServiceImpl extends ServiceImpl impleme
noticeDto.setPublishTime(notice.getPublishTime());
// 向在线接收者推送通知
- onlineReceivers.forEach(receiver -> webSocketService.sendNotification(receiver, noticeDto));
+ onlineReceivers.forEach(receiver -> sseService.sendToUser(receiver, "notice", noticeDto));
}
return publishResult;
}
@@ -261,6 +261,19 @@ public class NoticeServiceImpl extends ServiceImpl impleme
userNoticeService.remove(new LambdaQueryWrapper()
.eq(UserNotice::getNoticeId, id)
);
+
+ // 通知前端移除该通知
+ NoticeDTO noticeDto = new NoticeDTO();
+ noticeDto.setId(id);
+
+ // 获取所有在线用户
+ Set allOnlineUsers = sseService.getOnlineUsers().stream()
+ .map(OnlineUserDTO::getUsername)
+ .collect(Collectors.toSet());
+
+ // 向所有在线用户推送撤回通知
+ allOnlineUsers.forEach(username ->
+ sseService.sendToUser(username, "notice-revoke", noticeDto));
}
return revokeResult;
}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index fdeb1a92..cdd5d77a 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -219,13 +219,3 @@ wx:
miniapp:
appid: Your_AppId
secret: Your_AppSecret
-
-# Actuator 端点配置(监控端点暴露)
-management:
- endpoints:
- web:
- exposure:
- include: health,info,metrics,env,loggers,heapdump,threaddump
- endpoint:
- health:
- show-details: always