瀏覽代碼

websocket消息解析代码提交

jingyuanchao 1 年之前
父節點
當前提交
883464eb7f

+ 5 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/constant/WebSocketConstants.java

@@ -72,6 +72,11 @@ public interface WebSocketConstants {
     String DOWN_LINK_SERVICE_PASS_THROUGH = "downlinkServicePassthrough";
 
     /**
+     * 上行服务调用透传
+     */
+    String UP_LINK_SERVICE_PASS_THROUGH = "uplinkServicePassthrough";
+
+    /**
      * 获取Dvs状态
      */
     String GET_DVS_STATUS_SERVICES = "getDvsStatus";

+ 26 - 44
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/dto/WebsocketPayloadResolve.java

@@ -1,16 +1,11 @@
 package com.xunmei.mediator.websocket.dto;
 
 import cn.hutool.core.util.ObjectUtil;
-import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
-import com.xunmei.mediator.websocket.enums.TopicTypeEnums;
-import io.netty.util.internal.StringUtil;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.util.Map;
-
 /**
  * @author jingyuanchao
  * @date 2024/7/18 16:56
@@ -24,47 +19,34 @@ public class WebsocketPayloadResolve {
     private Object data;
 
 
-    public WebsocketPayloadResolve(WebsocketResult result, TopicTypeEnums typeEnums) {
-        boolean productEquals = WebSocketConstants.IOT_SERVER.equals(typeEnums.getProductName());
-        boolean deviceEquals = WebSocketConstants.IOT_SERVER_DEVICE.equals(typeEnums.getDeviceName());
-
-        if (productEquals && deviceEquals) {
+    public WebsocketPayloadResolve(WebsocketResult result) {
+        JSONObject parentPayload = (JSONObject) result.getPayload();
+        String serviceName = (String) parentPayload.get(WebSocketConstants.SERVICE);
+        if (WebSocketConstants.UP_LINK_SERVICE_PASS_THROUGH.equals(serviceName) ||
+                WebSocketConstants.DOWN_LINK_SERVICE_PASS_THROUGH.equals(serviceName)) {
             // 说明是透穿的
-            JSONObject jsb = (JSONObject) result.getPayload();
-            JSONObject payloadObj = (JSONObject) jsb.get(WebSocketConstants.ARGS);
-            WebsocketResult javaObject = payloadObj.toJavaObject(WebsocketResult.class);
-            if (javaObject.getPayload() instanceof JSONObject) {
-                processJSONObjectPayload(result.getPayload());
-            } else if (javaObject.getPayload() instanceof JSONArray) {
-                processJSONArrayPayload(result.getPayload());
+            JSONObject payloadObj = (JSONObject) parentPayload.get(WebSocketConstants.ARGS);
+            WebsocketResult childResult = payloadObj.toJavaObject(WebsocketResult.class);
+            JSONObject childValue = (JSONObject) childResult.getPayload();
+            Object object = childValue.get(WebSocketConstants.ARGS);
+            if (object instanceof JSONObject) {
+                JSONObject object1 = (JSONObject) object;
+                String event = (String) childValue.get(WebSocketConstants.EVENT);
+                String service = (String) childValue.get(WebSocketConstants.SERVICE);
+                JSONObject header = (JSONObject) object1.get(WebSocketConstants.HEADER);
+                String routingKey = ObjectUtil.isNotEmpty(event) ? event : service;
+                this.routingKey = routingKey;
+                this.header = header;
+                this.data = object1;
             }
-            return;
-        }
-
-        if (result.getPayload() instanceof JSONObject) {
-            processJSONObjectPayload(result.getPayload());
-        } else if (result.getPayload() instanceof JSONArray) {
-            processJSONArrayPayload(result.getPayload());
+        } else {
+            String event = (String) parentPayload.get(WebSocketConstants.EVENT);
+            String service = (String) parentPayload.get(WebSocketConstants.SERVICE);
+            JSONObject header = (JSONObject) parentPayload.get(WebSocketConstants.HEADER);
+            String routingKey = ObjectUtil.isNotEmpty(event) ? event : service;
+            this.routingKey = routingKey;
+            this.header = header;
+            this.data = parentPayload.get(WebSocketConstants.ARGS);
         }
     }
-
-    private void processJSONObjectPayload(Object payload) {
-        Map map = ((JSONObject) payload).toJavaObject(Map.class);
-        // 上报事件
-        String event = (String) map.get(WebSocketConstants.EVENT);
-        String service = (String) map.get(WebSocketConstants.SERVICE);
-        JSONObject header = (JSONObject) map.get(WebSocketConstants.HEADER);
-        String routingKey = ObjectUtil.isNotEmpty(event) ? event : service;
-        JSONObject args = (JSONObject) (ObjectUtil.isNotEmpty(map.get(WebSocketConstants.DATA)) ? map.get(WebSocketConstants.DATA) : map.get(WebSocketConstants.ARGS));
-        this.routingKey = routingKey;
-        this.header = header;
-        this.data = args;
-    }
-
-    private void processJSONArrayPayload(Object payload) {
-        final JSONArray jsonArray = (JSONArray) payload;
-        this.routingKey = StringUtil.EMPTY_STRING;
-        this.header = new JSONObject();
-        this.data = jsonArray;
-    }
 }

+ 5 - 37
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -25,10 +25,12 @@ import org.springframework.web.socket.*;
 import org.springframework.web.socket.handler.AbstractWebSocketHandler;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.time.LocalDateTime;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * @author gaoxiong
@@ -74,6 +76,8 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         String token = (String) session.getAttributes().get("token");
         if (StringUtils.isNotEmpty(token)) {
             IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
+            InetSocketAddress remoteAddress = session.getRemoteAddress();
+            Optional.ofNullable(remoteAddress).ifPresent(address -> serverInfo.setIotIp(address.getAddress().getHostAddress()));
             serverInfo.setRegisterCodeStatus(1);
             serverInfo.setLastConnectTime(LocalDateTime.now());
             serverInfo.setIotStatus(IotServerConnectStatus.CONNECTED.getIdx());
@@ -107,42 +111,6 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
      * @throws Exception 处理消息过程中可能抛出的异常
      */
     @Override
-   /* protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
-        // 从WebSocket会话中获取登录用户信息
-        WebSocketSessionHolder.updateToken(session);
-        String payload = message.getPayload();
-        log.info("接收到消息:{}",message.getPayload());
-        if (payload.isEmpty()){
-            return;
-        }
-        try {
-            WebsocketResult websocketResult = JSON.parseObject(payload, WebsocketResult.class);
-            Object obj = websocketResult.getPayload();
-            if (ObjectUtil.isNotEmpty(obj)){
-                Map map = JSON.parseObject(obj.toString(), Map.class);
-                //上报事件
-                String event = (String) map.get(WebSocketConstants.EVENT);
-                String service = (String) map.get(WebSocketConstants.SERVICE);
-                String trigger = (String) map.get(WebSocketConstants.TRIGGER);
-                String token = WebSocketSessionHolder.getSessionKey(session);
-                String routingKey=ObjectUtil.isNotEmpty(event) ? event : service;
-                //上报消息内容
-                JSONObject args = (JSONObject) map.get(WebSocketConstants.ARGS);
-                if (ObjectUtil.isEmpty(event)||args.isEmpty()){
-                    log.error("消息内容为空:{}",message.getPayload());
-                    return;
-                }
-                RouterService routeService = RouterServiceHandler.getRouteService(routingKey);
-                routeService.execute(new WebsocketExecuteReq(event, args,token));
-                //todo 返回iot消息
-            }
-        } catch (Exception e) {
-           log.error("转换消息内容时出错:{}",e);
-        }
-
-
-    }*/
-
     protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
         // 从WebSocket会话中获取登录用户信息
         String token = WebSocketSessionHolder.updateToken(session);
@@ -165,7 +133,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
                 LogUtils.WEBSOCKET_MSG.error("消息topic错误,topic:{}", topic);
                 return;
             }
-            WebsocketPayloadResolve payloadResolve = new WebsocketPayloadResolve(websocketResult, typeEnums);
+            WebsocketPayloadResolve payloadResolve = new WebsocketPayloadResolve(websocketResult);
             //上报消息内容
             final Object args = payloadResolve.getData();
             WebsocketExecuteReq executeReq = new WebsocketExecuteReq(payloadResolve.getRoutingKey(), args, token, websocketResult.getId(), topic, payloadResolve.getHeader(), typeEnums.getProductName(), typeEnums.getDeviceName());

+ 3 - 2
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/utils/IotServerUtils.java

@@ -32,10 +32,11 @@ public class IotServerUtils {
 
     public static WebsocketResult invokeDownLinkServer(TopicTypeEnums topicTypeEnums,String produceName,String deviceName,String invokeMethod,JSONObject object){
         Date date = new Date();
+        String id = UUID.randomUUID().toString();
         WebsocketResult iotWebsocketResult = new WebsocketResult();
         //Iot消息透穿至主机 固定Topic
         iotWebsocketResult.setTopic(TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE, WebSocketConstants.IOT_SERVER,WebSocketConstants.IOT_SERVER_DEVICE));
-        iotWebsocketResult.setId(UUID.randomUUID().toString());
+        iotWebsocketResult.setId(id);
         iotWebsocketResult.setTimestamp(date);
         //iot消息头
         JSONObject iotHeaderObj = new JSONObject();
@@ -49,7 +50,7 @@ public class IotServerUtils {
         //以下是主机消息内容
         WebsocketResult hostData = new WebsocketResult();
         iotPayloadObj.put(WebSocketConstants.ARGS,hostData );
-        hostData.setId(UUID.randomUUID().toString());
+        hostData.setId(id);
         hostData.setTimestamp(date);
         hostData.setTopic(TopicTypeEnums.formatUrl(topicTypeEnums, produceName,deviceName));
         JSONObject hostHeaderObj = new JSONObject();