feat: 开始管理用户是否读取了消息

目前出现了BUG。我修改代码后。前端websocket收不到消息了。
This commit is contained in:
Kylin
2024-08-29 17:39:10 +08:00
parent a1143a48a6
commit 936fc24b2e
4 changed files with 93 additions and 21 deletions

View File

@@ -1,7 +1,11 @@
package com.youlai.system.model.entity; package com.youlai.system.model.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.youlai.system.common.base.BaseEntity; import com.youlai.system.common.base.BaseEntity;
@@ -15,10 +19,16 @@ import com.youlai.system.common.base.BaseEntity;
@Getter @Getter
@Setter @Setter
@TableName("sys_notice_status") @TableName("sys_notice_status")
public class NoticeStatus extends BaseEntity { public class NoticeStatus implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/** /**
* 公共通知id * 公共通知id
*/ */
@@ -26,7 +36,7 @@ public class NoticeStatus extends BaseEntity {
/** /**
* 用户id * 用户id
*/ */
private Integer userId; private Long userId;
/** /**
* 读取状态0未读1已读取 * 读取状态0未读1已读取
*/ */

View File

@@ -1,14 +1,17 @@
package com.youlai.system.service; package com.youlai.system.service;
import java.util.Set;
public interface WebsocketService { public interface WebsocketService {
void addUser(String username); void addUser(String username);
void removeUser(String username) ; void removeUser(String username) ;
Set<String> getUsers();
/** /**
* 发送消息到前端 * 发送消息到前端
* @param message * @param message
*/ */
void sendStringToFrontend(String message); void sendStringToFrontend(String sender,String message);
} }

View File

@@ -1,9 +1,15 @@
package com.youlai.system.service.impl; package com.youlai.system.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.google.gson.*; import com.google.gson.*;
import com.youlai.system.model.entity.NoticeStatus;
import com.youlai.system.model.entity.SysUser;
import com.youlai.system.security.util.SecurityUtils; import com.youlai.system.security.util.SecurityUtils;
import com.youlai.system.service.NoticeStatusService;
import com.youlai.system.service.SysUserService;
import com.youlai.system.service.WebsocketService; import com.youlai.system.service.WebsocketService;
import jodd.util.StringUtil;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -39,6 +45,11 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
private final NoticeConverter noticeConverter; private final NoticeConverter noticeConverter;
private final WebsocketService webSocketServer; private final WebsocketService webSocketServer;
private final NoticeStatusService noticeStatusService;
private final SysUserService sysUserService;
private final Gson gson = new GsonBuilder() private final Gson gson = new GsonBuilder()
.registerTypeAdapter(LocalDateTime.class, (JsonSerializer<LocalDateTime>) (localDateTime, type, jsonSerializationContext) -> .registerTypeAdapter(LocalDateTime.class, (JsonSerializer<LocalDateTime>) (localDateTime, type, jsonSerializationContext) ->
new JsonPrimitive(localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME))) new JsonPrimitive(localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)))
@@ -47,9 +58,21 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
.create(); .create();
private void sendWebSocketMsg(Notice notice) { private void sendWebSocketMsg(Notice notice) {
if(notice.getSendStatus() > 0){ if (notice.getSendStatus() > 0) {
String jsonNotice = gson.toJson(noticeConverter.toVO(notice)); String jsonNotice = gson.toJson(noticeConverter.toVO(notice));
webSocketServer.sendStringToFrontend(jsonNotice); webSocketServer.sendStringToFrontend(SecurityUtils.getUsername(), jsonNotice);
List<SysUser> list = sysUserService.list();
for (SysUser sysUser : list) {
NoticeStatus noticeStatus = noticeStatusService.getOne(new LambdaQueryWrapper<NoticeStatus>().eq(NoticeStatus::getUserId, sysUser.getId()).eq(NoticeStatus::getNoticeId, notice.getId()));
if (noticeStatus == null) {
noticeStatus = new NoticeStatus();
noticeStatus.setUserId(sysUser.getId());
noticeStatus.setNoticeId(notice.getId());
noticeStatus.setReadStatus(0L);
noticeStatusService.save(noticeStatus);
}
}
} }
} }
@@ -94,7 +117,7 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
entity.setUpdateBy(SecurityUtils.getUserId()); entity.setUpdateBy(SecurityUtils.getUserId());
entity.setIsDelete(0); entity.setIsDelete(0);
boolean result = this.save(entity); boolean result = this.save(entity);
if(result){ if (result) {
sendWebSocketMsg(entity); sendWebSocketMsg(entity);
} }
return result; return result;
@@ -108,12 +131,12 @@ public class NoticeServiceImpl extends ServiceImpl<NoticeMapper, Notice> impleme
* @return * @return
*/ */
@Override @Override
public boolean updateNotice(Long id,NoticeForm formData) { public boolean updateNotice(Long id, NoticeForm formData) {
Notice entity = noticeConverter.toEntity(formData); Notice entity = noticeConverter.toEntity(formData);
entity.setUpdateBy(SecurityUtils.getUserId()); entity.setUpdateBy(SecurityUtils.getUserId());
entity.setIsDelete(0); entity.setIsDelete(0);
boolean result = this.updateById(entity); boolean result = this.updateById(entity);
if(result) { if (result) {
sendWebSocketMsg(entity); sendWebSocketMsg(entity);
} }
return result; return result;

View File

@@ -1,6 +1,7 @@
package com.youlai.system.service.impl; package com.youlai.system.service.impl;
import com.youlai.system.event.UserConnectionEvent; import com.youlai.system.event.UserConnectionEvent;
import com.youlai.system.model.dto.ChatMessage;
import com.youlai.system.service.WebsocketService; import com.youlai.system.service.WebsocketService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -8,7 +9,9 @@ import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -19,8 +22,14 @@ public class WebsocketServiceImpl implements WebsocketService {
private final SimpMessagingTemplate messagingTemplate; private final SimpMessagingTemplate messagingTemplate;
// 在线用户
private final Set<String> onlineUsers = ConcurrentHashMap.newKeySet(); private final Set<String> onlineUsers = ConcurrentHashMap.newKeySet();
// 离线消息
private final Set<ChatMessage> offlineMessages = ConcurrentHashMap.newKeySet();
private final Map<ChatMessage,Set<String>> messageReceiptStatus = new ConcurrentHashMap<>();
@Override @Override
public void addUser(String username) { public void addUser(String username) {
onlineUsers.add(username); onlineUsers.add(username);
@@ -31,12 +40,22 @@ public class WebsocketServiceImpl implements WebsocketService {
onlineUsers.remove(username); onlineUsers.remove(username);
} }
@Override
public Set<String> getUsers() {
return onlineUsers;
}
@EventListener @EventListener
public void handleUserConnectionEvent(UserConnectionEvent event) { public void handleUserConnectionEvent(UserConnectionEvent event) {
String username = event.getUsername(); String username = event.getUsername();
if (event.isConnected()) { if (event.isConnected()) {
onlineUsers.add(username); onlineUsers.add(username);
log.info("User connected: {}", username); log.info("User connected: {}", username);
// 发送离线消息
offlineMessages.forEach(message -> {
messagingTemplate.convertAndSendToUser(username, "/topic/chat", message);
messageReceiptStatus.computeIfAbsent(message, k -> ConcurrentHashMap.newKeySet()).add(username);
});
} else { } else {
onlineUsers.remove(username); onlineUsers.remove(username);
log.info("User disconnected: {}", username); log.info("User disconnected: {}", username);
@@ -45,14 +64,31 @@ public class WebsocketServiceImpl implements WebsocketService {
messagingTemplate.convertAndSend("/topic/onlineUserCount", onlineUsers.size()); messagingTemplate.convertAndSend("/topic/onlineUserCount", onlineUsers.size());
} }
@EventListener
public void handleSessionDisconnect(SessionDisconnectEvent event) {
String username = event.getUser().getName();
onlineUsers.remove(username);
}
@Scheduled(fixedRate = 5000) @Scheduled(fixedRate = 5000)
public void sendOnlineUserCount() { public void sendOnlineUserCount() {
messagingTemplate.convertAndSend("/topic/onlineUserCount", onlineUsers.size()); messagingTemplate.convertAndSend("/topic/onlineUserCount", onlineUsers.size());
} }
@Override @Override
public void sendStringToFrontend(String message) { public void sendStringToFrontend(String sender, String message) {
messagingTemplate.convertAndSend("/topic/chat", message); ChatMessage chatMessage = new ChatMessage(sender, message);
} offlineMessages.add(chatMessage);
messageReceiptStatus.putIfAbsent(chatMessage, ConcurrentHashMap.newKeySet());
onlineUsers.forEach(receiver -> {
messagingTemplate.convertAndSendToUser(receiver, "/topic/chat", chatMessage);
messageReceiptStatus.get(chatMessage).add(receiver);
});
if(messageReceiptStatus.get(chatMessage).size() == onlineUsers.size()) {
//记录完成状态
offlineMessages.remove(chatMessage);//从离线消息中移除已发送的消息
messageReceiptStatus.remove(chatMessage);//从消息接收状态集合总移除已发送的消息
}
}
} }