package com.ttstd.signaling.handler; import com.fasterxml.jackson.databind.ObjectMapper; import com.ttstd.signaling.entity.RtcSignalMessage; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * WebRTC核心信令处理器 * 继承TextWebSocketHandler:处理文本类型的WebSocket消息(WebRTC信令都是JSON文本) */ @Component public class WebRtcSignalHandler extends TextWebSocketHandler { /** * 核心:维护在线用户的WebSocket会话映射 * key: 用户ID(userId) * value: 该用户的WebSocket连接会话 * ConcurrentHashMap:线程安全,解决多用户并发连接的线程安全问题 */ public static final Map ONLINE_USER_SESSIONS = new ConcurrentHashMap<>(); // JSON序列化工具,用于将消息实体与JSON字符串互转 private final ObjectMapper objectMapper = new ObjectMapper(); /** * 客户端成功建立WebSocket连接后触发 * 约定:客户端连接时,通过请求参数传递userId,例如:ws://localhost:8080/ws/rtc?userId=user1001 */ @Override public void afterConnectionEstablished(WebSocketSession session) { // 获取连接的用户ID String userId = getUserIdFromSession(session); if (userId != null && !userId.isEmpty()) { // 将用户会话存入在线列表 ONLINE_USER_SESSIONS.put(userId, session); System.out.println("用户[" + userId + "] 上线,当前在线人数:" + ONLINE_USER_SESSIONS.size()); } } /** * 核心方法:接收客户端发送的信令消息,并完成【精准中转】 */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws Exception { // 1. 获取发送方用户ID String fromUserId = getUserIdFromSession(session); if (fromUserId == null || !ONLINE_USER_SESSIONS.containsKey(fromUserId)) { sendErrorMessage(session, "当前用户未登录或连接已失效"); return; } // 2. 解析前端发送的信令消息JSON -> 转为实体类 RtcSignalMessage signalMessage = objectMapper.readValue(textMessage.getPayload(), RtcSignalMessage.class); String toUserId = signalMessage.getToUserId(); // 目标用户ID String signalType = signalMessage.getSignalType(); // 信令类型:offer/answer/ice-candidate // 3. 核心逻辑:查询目标用户的连接会话,存在则中转消息,不存在则返回错误 WebSocketSession targetSession = ONLINE_USER_SESSIONS.get(toUserId); if (targetSession != null && targetSession.isOpen()) { // 将信令消息原样转发给目标用户 targetSession.sendMessage(new TextMessage(objectMapper.writeValueAsString(signalMessage))); System.out.println("信令中转成功:[" + fromUserId + "] -> [" + toUserId + "] 类型:" + signalType); } else { // 目标用户不在线/连接已关闭 sendErrorMessage(session, "目标用户[" + toUserId + "] 不在线或连接已断开"); } } /** * 客户端断开WebSocket连接后触发 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { String userId = getUserIdFromSession(session); if (userId != null) { ONLINE_USER_SESSIONS.remove(userId); System.out.println("用户[" + userId + "] 下线,当前在线人数:" + ONLINE_USER_SESSIONS.size()); } } /** * 处理WebSocket连接异常 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) { String userId = getUserIdFromSession(session); if (userId != null) { ONLINE_USER_SESSIONS.remove(userId); System.out.println("用户[" + userId + "] 连接异常,已移除会话,异常信息:" + exception.getMessage()); } } // ========== 私有工具方法 ========== /** * 从WebSocket会话中获取请求参数里的userId */ private String getUserIdFromSession(WebSocketSession session) { try { return session.getUri().getQuery().split("=")[1]; } catch (Exception e) { return null; } } /** * 向客户端发送错误提示消息 */ private void sendErrorMessage(WebSocketSession session, String errorMsg) throws IOException { RtcSignalMessage errorMessage = new RtcSignalMessage(null, null, "error", errorMsg); session.sendMessage(new TextMessage(objectMapper.writeValueAsString(errorMessage))); } }