Kaynağa Gözat

转发服务代码提交

jingyuanchao 1 yıl önce
ebeveyn
işleme
8ae2a28d2d

+ 47 - 41
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -42,6 +42,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
 
     @Autowired
     private IotServerInfoService iotServerInfoService;
+
     /**
      * 连接成功后
      */
@@ -50,16 +51,16 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
 
         String registerCode = (String) session.getAttributes().get("registerCode");
         if (registerCode != null) {
-            Map<String,Object> map = new HashMap<>();
+            Map<String, Object> map = new HashMap<>();
             IotServerInfo serverInfo = iotServerInfoService.getToken(registerCode);
-            if(serverInfo == null){
+            if (serverInfo == null) {
                 map.put("statusDescription", "验证码不存在!");
                 map.put("statusCode", WebsocketStatus.ERROR.getStatusCode());
-            }else{
-                map.put("statusDescription",serverInfo.getIotCode());
-                map.put("statusCode",WebsocketStatus.SUCCESS.getStatusCode());
+            } else {
+                map.put("statusDescription", serverInfo.getIotCode());
+                map.put("statusCode", WebsocketStatus.SUCCESS.getStatusCode());
             }
-            WebsocketResult register = createWebsocketResult(null, "register" , map);
+            WebsocketResult register = createWebsocketResult(null, "register", map);
             WebSocketUtils.sendMessage(session, JacksonUtils.toJSONString(register));
             log.info("[建立注册连接] sessionId: {},registerCode:{}", session.getId(), registerCode);
             session.close();
@@ -67,16 +68,16 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         }
 
         String token = (String) session.getAttributes().get("token");
-        if(StringUtils.isNotEmpty(token)){
+        if (StringUtils.isNotEmpty(token)) {
             IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
             serverInfo.setRegisterCodeStatus(1);
             serverInfo.setLastConnectTime(LocalDateTime.now());
             serverInfo.setIotStatus(IotServerConnectStatus.CONNECTED.getIdx());
             iotServerInfoService.updateByToken(serverInfo);
             WebSocketSessionHolder.addSession(token, session);
-            Map<String,Object> map = new HashMap<>();
-            map.put("statusCode",WebsocketStatus.SUCCESS.getStatusCode());
-            WebsocketResult result = createWebsocketResult(null, "login" , map);
+            Map<String, Object> map = new HashMap<>();
+            map.put("statusCode", WebsocketStatus.SUCCESS.getStatusCode());
+            WebsocketResult result = createWebsocketResult(null, "login", map);
             WebSocketUtils.sendMessage(session, JacksonUtils.toJSONString(result));
             log.info("[建立连接] sessionId: {},token:{}", session.getId(), token);
             return;
@@ -84,7 +85,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         session.close();
     }
 
-    private WebsocketResult createWebsocketResult(String id,String topic,Object object){
+    private WebsocketResult createWebsocketResult(String id, String topic, Object object) {
         WebsocketResult result = new WebsocketResult();
         result.setId(id);
         result.setTopic(topic);
@@ -142,55 +143,60 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         // 从WebSocket会话中获取登录用户信息
         WebSocketSessionHolder.updateToken(session);
         String payload = message.getPayload();
-        log.info("接收到消息:{}",message.getPayload());
+        log.info("接收到消息:{}", message.getPayload());
 
-        if (payload.isEmpty()){
+        if (payload.isEmpty()) {
             return;
         }
         try {
             WebsocketResult websocketResult = JSON.parseObject(payload, WebsocketResult.class);
             Object obj = websocketResult.getPayload();
             String topic = websocketResult.getTopic();
-                if (ObjectUtil.isNotEmpty(obj)){
+            if (ObjectUtil.isNotEmpty(obj)) {
 
-                    TopicTypeEnums typeEnums = TopicTypeEnums.matcherTopicTypeEnums(topic);
-                    if (typeEnums == null){
-                        log.error("消息topic错误:{}",topic);
-                        return;
-                    }
+                TopicTypeEnums typeEnums = TopicTypeEnums.matcherTopicTypeEnums(topic);
+                if (typeEnums == null) {
+                    log.error("消息topic错误:{}", topic);
+                    return;
+                }
                 Map map = JSON.parseObject(obj.toString(), Map.class);
                 //上报事件
                 String event = (String) map.get(WebSocketConstants.EVENT);
                 String service = (String) map.get(WebSocketConstants.SERVICE);
                 String trigger = (String) map.get(WebSocketConstants.TRIGGER);
                 String token = WebSocketSessionHolder.getSessionKey(session);
-                String routingKey=ObjectUtil.isNotEmpty(event) ? event : service;
+                String routingKey = ObjectUtil.isNotEmpty(event) ? event : service;
                 //上报消息内容
                 Object args = map.get(WebSocketConstants.ARGS);
-                if (ObjectUtil.isEmpty(event)||args==null){
-                    log.error("消息内容为空:{}",message.getPayload());
+                if (ObjectUtil.isEmpty(event) || args == null) {
+                    log.error("消息内容为空:{}", message.getPayload());
                     return;
                 }
-                    switch (typeEnums) {
-                        case SYS_NOTICE:
-                            break;
-                        case DEVICE_STATUS:
-                            break;
-                        case PRODUCT_EVENT_NOTICE:
-                            break;
-                        case PRODUCT_SERVICE_REPLY:
-                            break;
-                        case PRODUCT_PROPERTY_REPLY:
-                            break;
-                        default:
-                            break;
-                    }
-                RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(),routingKey);
-                routeService.execute(new WebsocketExecuteReq(event, args,token));
+                switch (typeEnums) {
+                    //系统通知
+                    case SYS_NOTICE:
+                        break;
+                    //设备状态通知
+                    case DEVICE_STATUS:
+                        break;
+                    //产品事件通知消息
+                    case PRODUCT_EVENT_NOTICE:
+                        //IoT返回服务调用消息
+                    case PRODUCT_SERVICE_REPLY:
+                        RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(), routingKey);
+                        routeService.execute(new WebsocketExecuteReq(event, args, token));
+                        break;
+                    //IoT返回属性
+                    case PRODUCT_PROPERTY_REPLY:
+                        break;
+                    default:
+                        break;
+                }
+
                 //todo 返回iot消息
             }
         } catch (Exception e) {
-            log.error("转换消息内容时出错:{}",e);
+            log.error("转换消息内容时出错:{}", e);
         }
 
 
@@ -245,12 +251,12 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
     public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
         String token = (String) session.getAttributes().get("token");
         WebSocketSessionHolder.removeSession(token);
-        if(StringUtils.isNotEmpty(token)){
+        if (StringUtils.isNotEmpty(token)) {
             IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
             serverInfo.setIotStatus(IotServerConnectStatus.OFFLINE.getIdx());
             iotServerInfoService.updateByToken(serverInfo);
         }
-        log.info("[断开连接] sessionId: {},token:{}",session.getId(),token);
+        log.info("[断开连接] sessionId: {},token:{}", session.getId(), token);
     }
 
     /**