From 599c24181d8d708837dc6011abcf0a53769e677d Mon Sep 17 00:00:00 2001 From: tongtongstudio Date: Wed, 21 Jan 2026 21:52:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=99=BB=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DockerFile | 10 + .../config/HttpSessionConfigurator.java | 2 +- .../ttstd/signaling/model/SignalMessage.java | 12 + .../service/WebRTCSignalingServer.java | 211 ++++-------------- 4 files changed, 62 insertions(+), 173 deletions(-) create mode 100644 DockerFile create mode 100644 src/main/java/com/ttstd/signaling/model/SignalMessage.java diff --git a/DockerFile b/DockerFile new file mode 100644 index 0000000..7fa85f8 --- /dev/null +++ b/DockerFile @@ -0,0 +1,10 @@ +FROM eclipse-temurin:21-jdk-jammy +MAINTAINER TongTongStudio +RUN mv /etc/apt/sources.list /etc/apt/sources.list.bak + +# RUN mkdir -p /data/uploads/ + +VOLUME /tmp +ADD target/*.jar app.jar +EXPOSE 2310 +ENTRYPOINT ["java", "-jar", "/app.jar"] \ No newline at end of file diff --git a/src/main/java/com/ttstd/signaling/config/HttpSessionConfigurator.java b/src/main/java/com/ttstd/signaling/config/HttpSessionConfigurator.java index 4200d64..7f98de1 100644 --- a/src/main/java/com/ttstd/signaling/config/HttpSessionConfigurator.java +++ b/src/main/java/com/ttstd/signaling/config/HttpSessionConfigurator.java @@ -12,4 +12,4 @@ public class HttpSessionConfigurator extends ServerEndpointConfig.Configurator { HttpSession httpSession = (HttpSession) request.getHttpSession(); config.getUserProperties().put(HttpSession.class.getName(), httpSession); } -} \ No newline at end of file +} diff --git a/src/main/java/com/ttstd/signaling/model/SignalMessage.java b/src/main/java/com/ttstd/signaling/model/SignalMessage.java new file mode 100644 index 0000000..541899b --- /dev/null +++ b/src/main/java/com/ttstd/signaling/model/SignalMessage.java @@ -0,0 +1,12 @@ +package com.ttstd.signaling.model; + +import lombok.Data; + +@Data +public class SignalMessage { + private String type; // 类型: "offer", "answer", "ice", "reject", "accept" + private String fromUser; // 发送者ID + private String toUser; // 接收者ID + private Object data; // 数据载体 (SDP 描述或 ICE 候选信息) + private String reason; // 用于拒绝时的原因说明 +} \ No newline at end of file diff --git a/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java b/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java index 6cc6b75..9b424a1 100644 --- a/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java +++ b/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java @@ -2,6 +2,7 @@ package com.ttstd.signaling.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.ttstd.signaling.config.HttpSessionConfigurator; +import com.ttstd.signaling.model.SignalMessage; import com.ttstd.signaling.model.SignalingMessage; import jakarta.servlet.http.HttpSession; import jakarta.websocket.*; @@ -13,197 +14,63 @@ import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import jakarta.websocket.*; +import jakarta.websocket.server.PathParam; +import jakarta.websocket.server.ServerEndpoint; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.stereotype.Component; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + @Component -@ServerEndpoint(value = "/signaling", configurator = HttpSessionConfigurator.class) +@ServerEndpoint("/signaling/{userId}") public class WebRTCSignalingServer { - private static final Logger logger = LoggerFactory.getLogger(WebRTCSignalingServer.class); + Logger logger = LoggerFactory.getLogger(WebRTCSignalingServer.class); - private final ObjectMapper objectMapper = new ObjectMapper(); - - // 存储在线用户会话:key=userId, value=WebSocket会话 - private static ConcurrentHashMap onlineUsers = new ConcurrentHashMap<>(); - // 存储用户房间关系:key=userId, value=roomId - private static ConcurrentHashMap userRooms = new ConcurrentHashMap<>(); - // 存储房间用户:key=roomId, value=用户ID集合 - private static ConcurrentHashMap> roomUsers = new ConcurrentHashMap<>(); - - private Session session; - private String userId; - private HttpSession httpSession; + // 用于存储在线用户的 Session + private static final Map clients = new ConcurrentHashMap<>(); + private static final ObjectMapper objectMapper = new ObjectMapper(); @OnOpen - public void onOpen(Session session, EndpointConfig config) { - this.session = session; - this.httpSession = (HttpSession) config.getUserProperties() - .get(HttpSession.class.getName()); - - // 从HTTP会话获取用户ID(实际项目中应从认证信息获取) - this.userId = (String) this.httpSession.getAttribute("userId"); - if (this.userId == null) { - this.userId = "user_" + session.getId().substring(0, 8); - } - - onlineUsers.put(this.userId, session); - logger.info("用户 {} 连接成功, 当前在线用户数: {}", this.userId, onlineUsers.size()); - - // 发送连接成功消息 - sendMessage(createSignalingMessage("connected", this.userId, null, null, null, null)); + public void onOpen(Session session, @PathParam("userId") String userId) { + clients.put(userId, session); + System.out.println("用户连接: " + userId); } @OnMessage - public void onMessage(String message, Session session) { + public void onMessage(String message, Session session, @PathParam("userId") String userId) { try { - SignalingMessage signalingMessage = objectMapper.readValue(message, SignalingMessage.class); - signalingMessage.setFrom(this.userId); - processSignalingMessage(signalingMessage); + // 解析收到的 JSON 消息 + SignalMessage signal = objectMapper.readValue(message, SignalMessage.class); + String toUser = signal.getToUser(); + Session targetSession = clients.get(toUser); + + if (targetSession != null && targetSession.isOpen()) { + // 转发信令(SDP, ICE, 拒绝/接受) + targetSession.getAsyncRemote().sendText(message); + System.out.println("信令类型 [" + signal.getType() + "] 从 " + userId + " 发往 " + toUser); + } else { + sendErrorMessage(session, "目标用户不在线: " + toUser); + } } catch (Exception e) { - logger.error("消息处理错误: {}", e.getMessage()); - sendError("消息格式错误"); + e.printStackTrace(); } } @OnClose - public void onClose(Session session) { - leaveRoom(); - onlineUsers.remove(this.userId); - logger.info("用户 {} 断开连接, 剩余在线用户数: {}", this.userId, onlineUsers.size()); + public void onClose(@PathParam("userId") String userId) { + clients.remove(userId); + System.out.println("用户离开: " + userId); } @OnError public void onError(Session session, Throwable error) { - logger.error("WebSocket错误: {}", error.getMessage()); - onlineUsers.remove(this.userId); - leaveRoom(); + error.printStackTrace(); } - private void processSignalingMessage(SignalingMessage message) { - switch (message.getType()) { - case "join": - handleJoinRoom(message); - break; - case "offer": - case "answer": - case "candidate": - forwardMessageToTarget(message); - break; - case "leave": - handleLeaveRoom(message); - break; - default: - sendError("不支持的消息类型: " + message.getType()); - } + private void sendErrorMessage(Session session, String errorMsg) { + session.getAsyncRemote().sendText("{\"type\":\"error\", \"message\":\"" + errorMsg + "\"}"); } - - private void handleJoinRoom(SignalingMessage message) { - String roomId = message.getRoomId(); - if (roomId == null || roomId.trim().isEmpty()) { - sendError("房间ID不能为空"); - return; - } - - // 离开之前的房间 - leaveRoom(); - - // 加入新房间 - userRooms.put(this.userId, roomId); - roomUsers.computeIfAbsent(roomId, k -> new ConcurrentHashMap<>()) - .put(this.userId, this.userId); - - logger.info("用户 {} 加入房间 {}", this.userId, roomId); - - // 通知房间内其他用户 - broadcastToRoom(roomId, createSignalingMessage("user-joined", this.userId, null, roomId, null, null), this.userId); - - // 发送加入成功消息 - sendMessage(createSignalingMessage("joined", this.userId, null, roomId, null, null)); - } - - private void handleLeaveRoom(SignalingMessage message) { - leaveRoom(); - sendMessage(createSignalingMessage("left", this.userId, null, null, null, null)); - } - - private void leaveRoom() { - String roomId = userRooms.get(this.userId); - if (roomId != null) { - userRooms.remove(this.userId); - ConcurrentHashMap users = roomUsers.get(roomId); - if (users != null) { - users.remove(this.userId); - if (users.isEmpty()) { - roomUsers.remove(roomId); - } else { - // 通知房间内其他用户 - broadcastToRoom(roomId, createSignalingMessage("user-left", this.userId, null, roomId, null, null), this.userId); - } - } - logger.info("用户 {} 离开房间 {}", this.userId, roomId); - } - } - - private void forwardMessageToTarget(SignalingMessage message) { - String targetUserId = message.getTo(); - if (targetUserId == null) { - sendError("目标用户ID不能为空"); - return; - } - - Session targetSession = onlineUsers.get(targetUserId); - if (targetSession != null && targetSession.isOpen()) { - try { - targetSession.getBasicRemote().sendText(objectMapper.writeValueAsString(message)); - logger.debug("消息从 {} 转发到 {}", this.userId, targetUserId); - } catch (IOException e) { - logger.error("消息转发失败: {}", e.getMessage()); - onlineUsers.remove(targetUserId); - } - } else { - sendError("目标用户不在线"); - } - } - - private void broadcastToRoom(String roomId, SignalingMessage message, String excludeUserId) { - ConcurrentHashMap users = roomUsers.get(roomId); - if (users != null) { - users.keySet().forEach(userId -> { - if (!userId.equals(excludeUserId)) { - Session userSession = onlineUsers.get(userId); - if (userSession != null && userSession.isOpen()) { - try { - userSession.getBasicRemote().sendText(objectMapper.writeValueAsString(message)); - } catch (IOException e) { - logger.error("广播消息失败: {}", e.getMessage()); - } - } - } - }); - } - } - - private void sendMessage(SignalingMessage message) { - try { - this.session.getBasicRemote().sendText(objectMapper.writeValueAsString(message)); - } catch (IOException e) { - logger.error("发送消息失败: {}", e.getMessage()); - } - } - - private void sendError(String errorMessage) { - SignalingMessage errorMsg = createSignalingMessage("error", "system", this.userId, null, null, null); - errorMsg.setPayload(java.util.Map.of("message", errorMessage)); - sendMessage(errorMsg); - } - - private SignalingMessage createSignalingMessage(String type, String from, String to, - String roomId, Object sdp, Object candidate) { - SignalingMessage message = new SignalingMessage(); - message.setType(type); - message.setFrom(from); - message.setTo(to); - message.setRoomId(roomId); - message.setSdp(sdp); - message.setCandidate(candidate); - return message; - } -} +} \ No newline at end of file