diff --git a/pom.xml b/pom.xml index c1b6f71..5e96709 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,19 @@ gson 2.10.1 + + + dev.onvoid.webrtc + webrtc-java + 0.8.0 + + + + + org.springframework.boot + spring-boot-starter-data-redis + + diff --git a/src/main/java/com/ttstd/signaling/bean/SignalingMessage.java b/src/main/java/com/ttstd/signaling/bean/SignalingMessage.java new file mode 100644 index 0000000..b7c1d9f --- /dev/null +++ b/src/main/java/com/ttstd/signaling/bean/SignalingMessage.java @@ -0,0 +1,53 @@ +package com.ttstd.signaling.bean; + +import java.io.Serializable; + +public class SignalingMessage implements Serializable { + private String type; // "call", "offer", "answer", "iceCandidate", etc. + private String senderId; // 发送者ID + private String targetId; // 接收者ID + private Object payload; // 具体携带的数据(可以是String, SessionDescription, IceCandidate等) + + // Default constructor for JSON deserialization + public SignalingMessage() { + } + + public SignalingMessage(String type, String senderId, String targetId, Object payload) { + this.type = type; + this.senderId = senderId; + this.targetId = targetId; + this.payload = payload; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getSenderId() { + return senderId; + } + + public void setSenderId(String senderId) { + this.senderId = senderId; + } + + public String getTargetId() { + return targetId; + } + + public void setTargetId(String targetId) { + this.targetId = targetId; + } + + public Object getPayload() { + return payload; + } + + public void setPayload(Object payload) { + this.payload = payload; + } +} \ No newline at end of file diff --git a/src/main/java/com/ttstd/signaling/config/RedisConfig.java b/src/main/java/com/ttstd/signaling/config/RedisConfig.java new file mode 100644 index 0000000..8212328 --- /dev/null +++ b/src/main/java/com/ttstd/signaling/config/RedisConfig.java @@ -0,0 +1,34 @@ +package com.ttstd.signaling.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +@Configuration +public class RedisConfig { + + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(redisConnectionFactory); + + // String serializer for keys + StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); + template.setKeySerializer(stringRedisSerializer); + template.setHashKeySerializer(stringRedisSerializer); + + // JSON serializer for values + GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); + template.setValueSerializer(genericJackson2JsonRedisSerializer); + template.setHashValueSerializer(genericJackson2JsonRedisSerializer); + + template.afterPropertiesSet(); + return template; + } +} diff --git a/src/main/java/com/ttstd/signaling/model/WebRTCMessage.java b/src/main/java/com/ttstd/signaling/model/WebRTCMessage.java new file mode 100644 index 0000000..fe116d2 --- /dev/null +++ b/src/main/java/com/ttstd/signaling/model/WebRTCMessage.java @@ -0,0 +1,10 @@ +package com.ttstd.signaling.model; + +import lombok.Data; + +@Data +public class WebRTCMessage { + private String type; // "offer", "answer", "iceCandidate", "call", "hangup", "reject", "accept" + private String target; + private Object data; +} diff --git a/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java b/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java index 3ebcfb3..e30a5ba 100644 --- a/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java +++ b/src/main/java/com/ttstd/signaling/service/WebRTCSignalingServer.java @@ -1,24 +1,19 @@ package com.ttstd.signaling.service; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.ttstd.signaling.bean.IceCandidate; -import com.ttstd.signaling.bean.SessionDescription; -import com.ttstd.signaling.model.*; +import com.ttstd.signaling.bean.SignalingMessage; +import com.ttstd.signaling.model.WebRTCMessage; import jakarta.websocket.*; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import java.util.HashMap; +import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -28,158 +23,200 @@ public class WebRTCSignalingServer { Logger logger = LoggerFactory.getLogger(WebRTCSignalingServer.class); - private Gson gson = new Gson(); // 用于存储在线用户的 Session private static final Map clients = new ConcurrentHashMap<>(); private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final Map serviceSessionDescriptionMap = new ConcurrentHashMap<>(); - private static final Map serviceIceCandidateMap = new ConcurrentHashMap<>(); + // RedisTemplate for storing and retrieving messages for offline users + private static RedisTemplate redisTemplate; - private static final Map clientSessionDescriptionMap = new ConcurrentHashMap<>(); - private static final Map clientIceCandidateMap = new ConcurrentHashMap<>(); + @Autowired + public void setRedisTemplate(RedisTemplate redisTemplate) { + WebRTCSignalingServer.redisTemplate = redisTemplate; + } @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { clients.put(userId, session); logger.info("onOpen: 用户连接: " + userId); + + // Check for pending messages in Redis for the newly connected user + String redisKey = "webrtc:pending_messages:" + userId; + if (redisTemplate != null) { + Long size = redisTemplate.opsForList().size(redisKey); + if (size != null && size > 0) { + logger.info("onOpen: 用户 {} 上线,发现 {} 条待发送消息。", userId, size); + // Retrieve and send all pending messages + for (int i = 0; i < size; i++) { + WebRTCMessage pendingMessage = (WebRTCMessage) redisTemplate.opsForList().leftPop(redisKey); + if (pendingMessage != null) { + try { + session.getAsyncRemote().sendText(objectMapper.writeValueAsString(pendingMessage)); + logger.info("onOpen: 已将待发送消息发送给用户 {}: {}", userId, pendingMessage.getType()); + } catch (JsonProcessingException e) { + logger.error("onOpen: 序列化待发送消息失败: {}", e.getMessage()); + } catch (IOException e) { + logger.error("onOpen: 发送待发送消息给用户 {} 失败: {}", userId, e.getMessage()); + // If sending fails (e.g., network issue), push it back to Redis + redisTemplate.opsForList().rightPush(redisKey, pendingMessage); + } + } + } + } + } else { + logger.warn("RedisTemplate is not initialized. Cannot check for pending messages."); + } } @OnMessage public void onMessage(String message, Session session, @PathParam("userId") String userId) { - logger.info("onMessage: message = " + message); + logger.info("onMessage: 收到来自用户 {} 的消息: {}", userId, message); try { - JsonNode jsonNode = objectMapper.readTree(message); - int msgType = jsonNode.get("msg_type").asInt(); - logger.info("onMessage: msgType = " + msgType); - String content = jsonNode.get("content").toString(); - logger.info("onMessage: content = " + content); - switch (msgType) { - case 1: - receiverServiceSdp(content); - break; - case 2: - receiverServiceIce(content); - break; - case 3: - receiverClientOfferMessage(content); - break; - case 4: - receiverClientSdpMessage(content); - break; - case 5: - receiverClientIceMessage(content); - break; - case -99: - // 解析收到的 JSON 消息 - SignalMessage signal = objectMapper.readValue(message, SignalMessage.class); - String toUser = signal.getToUser(); - Session targetSession = clients.get(toUser); - - if (targetSession != null && targetSession.isOpen()) { - // 转发信令(SDP, ICE, 拒绝/接受) - targetSession.getAsyncRemote().sendText(message); - logger.info("onMessage: 信令类型 [" + signal.getType() + "] 从 " + userId + " 发往 " + toUser); - } else { - sendErrorMessage(session, "目标用户不在线: " + toUser); - } - break; + WebRTCMessage webRTCMessage = objectMapper.readValue(message, WebRTCMessage.class); + if (webRTCMessage == null) { + logger.warn("onMessage: 无法解析消息,请检查消息格式。"); + return; } + String type = webRTCMessage.getType(); + switch (type) { + case "init": + + break; + case "ask": + // Handle ask message + case "offer": + // Handle offer message + case "answer": + // Handle answer messages + case "reject": + // Handle reject message + case "accept": + // Handle accept message + handleWebRTCMessage(session, webRTCMessage, userId); + break; + case "sdp": + // Handle SessionDescription messages + case "iceCandidate": + // Handle iceCandidate messages + handleSdpMessage(session, webRTCMessage, userId); + break; + + case "call": + // Handle call message + break; + case "hangup": + // Handle hangup message + break; + case "ping": + // Handle ping message + break; + case "unknown": + default: + // Handle unknown message type + break; + } + } catch (JsonProcessingException e) { + logger.error("onMessage: JSON 解析失败: {}", e.getMessage()); + sendErrorMessage(session, userId, "消息格式错误,无法解析。"); } catch (Exception e) { - e.printStackTrace(); + logger.error("onMessage: 处理消息时发生未知错误: {}", e.getMessage(), e); + sendErrorMessage(session, userId, "服务器内部错误。"); } } - /** - * 接收并存储被控端sdp信息 - * @param jsonString - * @throws JsonProcessingException - */ - private void receiverServiceSdp(String jsonString) throws JsonProcessingException { - // 解析收到的 JSON 消息 - ServiceSdpMessage serviceSdpMessage = objectMapper.readValue(jsonString, ServiceSdpMessage.class); - String fromUser = serviceSdpMessage.getFromUser(); - Session targetSession = clients.get(fromUser); - logger.info("receiverServiceSdp: fromUser = " + fromUser); - logger.info("receiverServiceSdp: targetSession = " + targetSession); - SessionDescription sessionDescription = new SessionDescription(serviceSdpMessage.getType(), serviceSdpMessage.getSdp()); - serviceSessionDescriptionMap.put(fromUser, sessionDescription); - logger.info("receiverServiceSdp: serviceSessionDescriptionMap size = " + serviceSessionDescriptionMap.size()); + private void handleWebRTCMessage(Session session, WebRTCMessage webRTCMessage, String userId) throws JsonProcessingException { + logger.info("handleWebRTCMessage: 收到来自用户 {} 的消息: {}", userId, webRTCMessage); + String type = webRTCMessage.getType(); + + SignalingMessage signalingMessage; + if (webRTCMessage.getData() instanceof String) { + signalingMessage = objectMapper.readValue((String) webRTCMessage.getData(), SignalingMessage.class); + } else { + signalingMessage = objectMapper.convertValue(webRTCMessage.getData(), SignalingMessage.class); + } + + String targetId = signalingMessage.getTargetId(); + + if (targetId != null && !targetId.isEmpty()) { + if (clients.containsKey(targetId)) { + // Target user is online, send message directly + Session targetSession = clients.get(targetId); + if (targetSession != null && targetSession.isOpen()) { +// targetSession.getAsyncRemote().sendText(objectMapper.writeValueAsString(webRTCMessage)); + logger.info("onMessage: 消息从 {} 发送给在线用户 {}", userId, targetId); + switch (type) { + case "ask":// Handle ask message + targetSession.getAsyncRemote().sendText(objectMapper.writeValueAsString(webRTCMessage)); + break; + case "offer":// Handle offer message + targetSession.getAsyncRemote().sendText(objectMapper.writeValueAsString(webRTCMessage)); + break; + case "answer":// Handle answer messages + + break; + case "reject":// Handle reject message + + break; + case "accept":// Handle accept message + + break; + } + } else { + clients.remove(targetId); + logger.warn("onMessage: 目标用户 {} 的会话已关闭", targetId); + } + } else { + sendErrorMessage(session, userId, "onMessage: 目标用户 {" + targetId + "} 不在线"); + logger.info("onMessage: 目标用户 {} 不在线", targetId); + } + } else { + logger.warn("onMessage: 收到消息但缺少 targetId 或 targetId 为空,无法转发或存储。消息内容: {}", webRTCMessage); + sendErrorMessage(session, userId, "消息格式错误,缺少目标用户ID。"); + } } - /** - * 接收并存储被控端ice信息 - * @param jsonString - * @throws JsonProcessingException - */ - private void receiverServiceIce(String jsonString) throws JsonProcessingException { - ServiceIceMessage serviceIceMessage = objectMapper.readValue(jsonString, ServiceIceMessage.class); - String fromUser = serviceIceMessage.getFromUser(); - Session targetSession = clients.get(fromUser); - logger.info("receiverServiceIce: fromUser = " + fromUser); - logger.info("receiverServiceIce: targetSession = " + targetSession); - IceCandidate iceCandidate = new IceCandidate(serviceIceMessage.getSdpMid(), serviceIceMessage.getSdpMLineIndex(), serviceIceMessage.getSdp()); - serviceIceCandidateMap.put(fromUser, iceCandidate); - logger.info("receiverServiceIce: serviceIceCandidateMap size = " + serviceIceCandidateMap.size()); - } + private void handleSdpMessage(Session session, WebRTCMessage webRTCMessage, String userId) throws JsonProcessingException { + logger.info("handleSdpMessage: 收到来自用户 {} 的消息: {}", userId, webRTCMessage); + String type = webRTCMessage.getType(); - /** - * 接收并存储控制端sdp信息 - * @param jsonString - * @throws JsonProcessingException - */ - private void receiverClientOfferMessage(String jsonString) throws JsonProcessingException { - ClientOfferMessage clientOfferMessage = objectMapper.readValue(jsonString, ClientOfferMessage.class); - String fromUser = clientOfferMessage.getFromUser(); - logger.info("receiverClientOfferMessage: fromUser = " + fromUser); - String toUser = clientOfferMessage.getToUser(); - logger.info("receiverClientOfferMessage: toUser = " + toUser); + SignalingMessage signalingMessage; + if (webRTCMessage.getData() instanceof String) { + signalingMessage = objectMapper.readValue((String) webRTCMessage.getData(), SignalingMessage.class); + } else { + signalingMessage = objectMapper.convertValue(webRTCMessage.getData(), SignalingMessage.class); + } - logger.info("receiverClientOfferMessage: serviceSessionDescriptionMap size = " + serviceSessionDescriptionMap.size()); - logger.info("receiverClientOfferMessage: serviceIceCandidateMap size = " + serviceIceCandidateMap.size()); + String targetId = signalingMessage.getTargetId(); - Session session = clients.get(fromUser); - - SessionDescription sessionDescription = serviceSessionDescriptionMap.get(toUser); - JsonObject sdpJsonObject = new JsonObject(); - sdpJsonObject.addProperty("msg_type", "11"); - sdpJsonObject.addProperty("sdp", sessionDescription.toString()); - String sdpJsonString = gson.toJson(sdpJsonObject); - logger.info("receiverClientOfferMessage: sdpJsonString = " + sdpJsonString); - session.getAsyncRemote().sendText(gson.toJson(sdpJsonObject)); - - IceCandidate iceCandidate = serviceIceCandidateMap.get(toUser); - JsonObject iceJsonObject = new JsonObject(); - iceJsonObject.addProperty("msg_type", "22"); - iceJsonObject.addProperty("ice", iceCandidate.toString()); - String iceJsonString = gson.toJson(iceJsonObject); - logger.info("receiverClientOfferMessage: iceJsonString = " + iceJsonString); - - session.getAsyncRemote().sendText(gson.toJson(iceJsonObject)); - - } - - /** - * 接收并存储控制端ice信息 - * @param jsonString - * @throws JsonProcessingException - */ - private void receiverClientSdpMessage(String jsonString) throws JsonProcessingException { - ClientSdpMessage clientSdpMessage = objectMapper.readValue(jsonString, ClientSdpMessage.class); - String fromUser = clientSdpMessage.getFromUser(); - String toUser = clientSdpMessage.getToUser(); - SessionDescription description = new SessionDescription(clientSdpMessage.getType(), clientSdpMessage.getSdp()); - clientSessionDescriptionMap.put(fromUser, description); - } - - private void receiverClientIceMessage(String jsonString) throws JsonProcessingException { - ClientIceMessage clientIceMessage = objectMapper.readValue(jsonString, ClientIceMessage.class); - String fromUser = clientIceMessage.getFromUser(); - String toUser = clientIceMessage.getToUser(); - IceCandidate iceCandidate = new IceCandidate(clientIceMessage.getSdpMid(), clientIceMessage.getSdpMLineIndex(), clientIceMessage.getSdp()); - clientIceCandidateMap.put(fromUser, iceCandidate); + if (targetId != null && !targetId.isEmpty()) { + if (clients.containsKey(targetId)) { + // Target user is online, send message directly + Session targetSession = clients.get(targetId); + if (targetSession != null && targetSession.isOpen()) { +// targetSession.getAsyncRemote().sendText(objectMapper.writeValueAsString(webRTCMessage)); + logger.info("onMessage: 消息从 {} 发送给在线用户 {}", userId, targetId); + switch (type) { + case "sdp":// Handle SessionDescription message + targetSession.getAsyncRemote().sendText(objectMapper.writeValueAsString(webRTCMessage)); + break; + case "iceCandidate":// Handle iceCandidate message + targetSession.getAsyncRemote().sendText(objectMapper.writeValueAsString(webRTCMessage)); + break; + } + } else { + clients.remove(targetId); + logger.warn("onMessage: 目标用户 {} 的会话已关闭", targetId); + } + } else { + sendErrorMessage(session, userId, "onMessage: 目标用户 {" + targetId + "} 不在线"); + logger.info("onMessage: 目标用户 {} 不在线", targetId); + } + } else { + logger.warn("onMessage: 收到消息但缺少 targetId 或 targetId 为空,无法转发或存储。消息内容: {}", webRTCMessage); + sendErrorMessage(session, userId, "消息格式错误,缺少目标用户ID。"); + } } @OnClose @@ -190,11 +227,33 @@ public class WebRTCSignalingServer { @OnError public void onError(Session session, Throwable error) { - error.printStackTrace(); - logger.info("onError: " + error.getMessage()); + logger.error("onError: 会话 {} 发生错误: {}", session.getId(), error.getMessage(), error); } - private void sendErrorMessage(Session session, String errorMsg) { - session.getAsyncRemote().sendText("{\"type\":\"error\", \"message\":\"" + errorMsg + "\"}"); + private void sendErrorMessage(Session session, String userId, String errorMsg) { + try { + WebRTCMessage errorMessage = new WebRTCMessage(); + errorMessage.setType("error"); + errorMessage.setTarget(userId); + errorMessage.setData(errorMsg); // Include the error message in data + session.getAsyncRemote().sendText(objectMapper.writeValueAsString(errorMessage)); + logger.info("sendErrorMessage: 发送错误消息: {}", errorMsg); + } catch (JsonProcessingException e) { + logger.error("sendErrorMessage: 无法序列化错误消息: {}", e.getMessage()); + } } -} \ No newline at end of file + + private void storeMessageInRedis(String targetId, WebRTCMessage message) { + if (redisTemplate != null) { + try { + String redisKey = "webrtc:pending_messages:" + targetId; + redisTemplate.opsForList().rightPush(redisKey, message); + logger.info("storeMessageInRedis: 消息已存储到 Redis,key: {}", redisKey); + } catch (Exception e) { + logger.error("storeMessageInRedis: 存储消息到 Redis 失败: {}", e.getMessage(), e); + } + } else { + logger.warn("RedisTemplate is not initialized. Cannot store message for user {}.", targetId); + } + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f2a6b44..e7f0179 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,2 +1,12 @@ spring.application.name=OneKeyCallWebRTCSignaling -server.port=2310 \ No newline at end of file +server.port=2310 + +spring.data.redis.database=5 +spring.data.redis.host=175.178.213.60 +spring.data.redis.port=26379 +spring.data.redis.password=fanhuitong +spring.data.redis.timeout=10s +spring.data.redis.lettuce.pool.max-active=8 +spring.data.redis.lettuce.pool.max-wait=-1 +spring.data.redis.lettuce.pool.max-idle=8 +spring.data.redis.lettuce.pool.min-idle=0 \ No newline at end of file