Kaynağa Gözat

websocket 实现,集群实现

gaoxiong 1 yıl önce
ebeveyn
işleme
939386778e

+ 1 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/server/service/IotServerInfoService.java

@@ -19,4 +19,5 @@ public interface IotServerInfoService extends IService<IotServerInfo> {
 
     void updateByToken(IotServerInfo iotServerInfo);
 
+    IotServerInfo selectByToken(String token);
 }

+ 9 - 1
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/server/service/impl/IotServerInfoServiceImpl.java

@@ -1,6 +1,7 @@
 package com.xunmei.mediator.api.server.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
@@ -31,7 +32,7 @@ public class IotServerInfoServiceImpl extends ServiceImpl<IotServerInfoMapper, I
         queryWrapper.last(Constants.LIMIT1);
         IotServerInfo serverInfo = baseMapper.selectOne(queryWrapper);
         if (serverInfo == null) {
-            throw new RuntimeException("验证码信息不存在!");
+            return null;
         }
         serverInfo.setIotCode(UUID.randomUUID().toString());
 
@@ -49,4 +50,11 @@ public class IotServerInfoServiceImpl extends ServiceImpl<IotServerInfoMapper, I
         }
         updateById(iotServerInfo);
     }
+
+    @Override
+    public IotServerInfo selectByToken(String token) {
+        QueryWrapper<IotServerInfo> qw = new QueryWrapper<>();
+        qw.lambda().eq(IotServerInfo::getIotCode,token);
+        return baseMapper.selectOne(qw);
+    }
 }

+ 37 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/enums/WebsocketStatus.java

@@ -0,0 +1,37 @@
+package com.xunmei.mediator.websocket.enums;
+
+/**
+ * @author gaoxiong
+ * @Title: Websocket状态
+ * @Package
+ * @Description:
+ * @date 2024/7/917:21
+ */
+public enum WebsocketStatus {
+
+    SUCCESS(200,"请求成功"),
+    ERROR(400,"错误请求"),
+    UNAUTHORIZED(401,"用户未登录"),
+    FORBIDDEN(403,"用户已登录,但无权限"),
+    NOT_FOUND(404,"设备未找到"),
+    CLOSED(444,"websocket连接被服务端主动关闭了"),
+    INTERNAL_ERROR(500,"内部错误"),
+    NOT_SUPPORT(501,"不支持"),
+    DEVICE_OFFLINE(604,"设备不在线");
+
+    private int statusCode;
+    private String statusDescription;
+
+    WebsocketStatus(int statusCode, String statusDescription) {
+        this.statusCode = statusCode;
+        this.statusDescription = statusDescription;
+    }
+
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    public String getStatusDescription() {
+        return statusDescription;
+    }
+}

+ 32 - 9
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -2,19 +2,25 @@ package com.xunmei.mediator.websocket.handler;
 
 import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSON;
+import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
+import com.xunmei.common.core.enums.iot.IotServerConnectStatus;
 import com.xunmei.common.core.utils.StringUtils;
 import com.xunmei.common.redis.utils.RedisUtils;
+import com.xunmei.mediator.api.server.service.IotServerInfoService;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
+import com.xunmei.mediator.websocket.enums.WebsocketStatus;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.utils.WebSocketUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
 import org.springframework.web.socket.handler.AbstractWebSocketHandler;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.LocalDateTime;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +35,10 @@ import java.util.Map;
 @Slf4j
 @Component
 public class SocWebSocketHandler extends AbstractWebSocketHandler {
+
+
+    @Autowired
+    private IotServerInfoService iotServerInfoService;
     /**
      * 连接成功后
      */
@@ -38,8 +48,14 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         String registerCode = (String) session.getAttributes().get("registerCode");
         if (registerCode != null) {
             Map<String,Object> map = new HashMap<>();
-            map.put("token","kkjkkjjiikkjkjkj");
-            map.put("statusCode",200);
+            IotServerInfo serverInfo = iotServerInfoService.getToken(registerCode);
+            if(serverInfo == null){
+                map.put("statusDescription", WebsocketStatus.CLOSED.getStatusDescription());
+                map.put("statusCode", WebsocketStatus.CLOSED.getStatusCode());
+            }else{
+                map.put("statusDescription",serverInfo.getIotCode());
+                map.put("statusCode",WebsocketStatus.SUCCESS.getStatusCode());
+            }
             WebsocketResult register = createWebsocketResult(null, "register" , map);
             WebSocketUtils.sendMessage(session, JSON.toJSONString(register));
             log.info("[建立注册连接] sessionId: {},registerCode:{}", session.getId(), registerCode);
@@ -49,9 +65,14 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
 
         String token = (String) session.getAttributes().get("token");
         if(StringUtils.isNotEmpty(token)){
+            IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
+            serverInfo.setRegisterCodeStatus(1);
+            serverInfo.setLastConnectTime(LocalDateTime.now());
+            serverInfo.setIotStatus(IotServerConnectStatus.CONNECTED.getIdx());
+            iotServerInfoService.updateByToken(serverInfo);
             WebSocketSessionHolder.addSession(token, session);
             Map<String,Object> map = new HashMap<>();
-            map.put("statusCode",200);
+            map.put("statusCode",WebsocketStatus.SUCCESS.getStatusCode());
             WebsocketResult result = createWebsocketResult(null, "login" , map);
             WebSocketUtils.sendMessage(session, JSON.toJSONString(result));
             log.info("[建立连接] sessionId: {},token:{}", session.getId(), token);
@@ -65,7 +86,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         result.setId(id);
         result.setTopic(topic);
         //当前时间转换为 格式:2024-07-02T14:17:32
-        result.setTimestamp(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
+        result.setTimestamp(DateUtil.format(new Date(),"yyyy-MM-dd'T'HH:mm:ss"));
         result.setPayload(object);
         return result;
     }
@@ -93,8 +114,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
      */
     @Override
     protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
-        WebSocketSessionHolder.updateToken(session);
-
+        String token = WebSocketSessionHolder.updateToken(session);
         super.handleBinaryMessage(session, message);
     }
 
@@ -107,8 +127,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
      */
     @Override
     protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
-        WebSocketSessionHolder.updateToken(session);
-
+        String token = WebSocketSessionHolder.updateToken(session);
         WebSocketUtils.sendPongMessage(session);
     }
 
@@ -122,7 +141,6 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
     @Override
     public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
         WebSocketSessionHolder.updateToken(session);
-
         log.error("[传输错误] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
     }
 
@@ -136,6 +154,11 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
     public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
         String token = (String) session.getAttributes().get("token");
         WebSocketSessionHolder.removeSession(token);
+        if(StringUtils.isNotEmpty(token)){
+            IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
+            serverInfo.setIotStatus(IotServerConnectStatus.OFFLINE.getIdx());
+            iotServerInfoService.updateByToken(serverInfo);
+        }
         log.info("[断开连接] sessionId: {},token:{}",session.getId(),token);
     }
 

+ 4 - 3
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/holder/WebSocketSessionHolder.java

@@ -28,7 +28,7 @@ public class WebSocketSessionHolder {
 
     public static final String REDIS_TOPIC_WEBSOCKET_TOKEN = "websocket:token_";
 
-    public static void updateToken(WebSocketSession session){
+    public static String updateToken(WebSocketSession session){
         /**
          * 根据session,获取token
          */
@@ -40,9 +40,10 @@ public class WebSocketSessionHolder {
                 RedissonClient client = RedisUtils.getClient();
                 RBucket<Object> bucket = client.getBucket(WebSocketSessionHolder.REDIS_TOPIC_WEBSOCKET_TOKEN + entry.getKey());
                 bucket.expire(Duration.ofMillis(1000 * 60 * 35));
-
+                return entry.getKey();
             }
         }
+        return null;
     }
 
     /**
@@ -65,7 +66,7 @@ public class WebSocketSessionHolder {
      * @param sessionKey 要移除的会话键
      */
     public static void removeSession(String sessionKey) {
-        if (USER_SESSION_MAP.containsKey(sessionKey)) {
+        if (StringUtils.isNotEmpty(sessionKey) &&  USER_SESSION_MAP.containsKey(sessionKey)) {
             USER_SESSION_MAP.remove(sessionKey);
             RedisUtils.deleteObject(REDIS_TOPIC_WEBSOCKET_TOKEN + sessionKey);
         }