Răsfoiți Sursa

录像索引主动获取代码提交,消息解析代码修改提交

jingyuanchao 1 an în urmă
părinte
comite
6d0cf8ccf0

+ 5 - 0
soc-api/soc-api-system/src/main/java/com/xunmei/system/api/util/LogUtils.java

@@ -132,4 +132,9 @@ public class LogUtils {
      * 设备控制日志(子系统布撤防、防区旁路/取消旁路、用电类控制、空调类控制)
      */
     public  static  final Logger DEVICE_CONTROL_LOG=LoggerFactory.getLogger("deviceControlLog");
+
+    /**
+     * websocket日志
+     */
+    public  static  final Logger WEBSOCKET_MSG=LoggerFactory.getLogger("websocketMsgLog");
 }

+ 5 - 28
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/video/service/impl/VideoIntegrityCheckServiceImpl.java

@@ -32,6 +32,7 @@ import com.xunmei.mediator.websocket.enums.ProductEnums;
 import com.xunmei.mediator.websocket.enums.TopicTypeEnums;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.service.RouterService;
+import com.xunmei.mediator.websocket.utils.IotServerUtils;
 import com.xunmei.system.api.RemoteOrgService;
 import com.xunmei.system.api.domain.SysOrg;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
@@ -110,37 +111,13 @@ public class VideoIntegrityCheckServiceImpl extends ServiceImpl<VideoIntegrityCh
             return;
         }
         for (IotDeviceInfo iotDeviceInfo : deviceInfoList) {
-            WebsocketResult iotWebsocketResult = new WebsocketResult();
-            iotWebsocketResult.setTopic(TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE, WebSocketConstants.IOT_SERVER,WebSocketConstants.IOT_SERVER_DEVICE));
-            iotWebsocketResult.setId(UUID.randomUUID().toString());
-            iotWebsocketResult.setTimestamp(new Date());
-            JSONObject iotHeaderObj = new JSONObject();
-            iotHeaderObj.put(WebSocketConstants.PRODUCT_NAME,WebSocketConstants.IOT_SERVER);
-            iotHeaderObj.put(WebSocketConstants.DEVICE_NAME, WebSocketConstants.IOT_SERVER_DEVICE);
-            iotWebsocketResult.setHeaders(iotHeaderObj);
-            JSONObject iotPayloadObj = new JSONObject();
-            iotWebsocketResult.setPayload(iotPayloadObj);
-            iotPayloadObj.put(WebSocketConstants.SERVICE, WebSocketConstants.DOWN_LINK_SERVICE_PASS_THROUGH);
-
-            WebsocketResult hostData = new WebsocketResult();
-            iotPayloadObj.put(WebSocketConstants.ARGS,hostData );
-            hostData.setId(UUID.randomUUID().toString());
-            hostData.setTopic(TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE, iotDeviceInfo.getDeviceProduct(),iotDeviceInfo.getDeviceCode()));
-            JSONObject hostHeaderObj = new JSONObject();
-            hostHeaderObj.put(WebSocketConstants.PRODUCT_NAME,iotDeviceInfo.getDeviceProduct());
-            hostHeaderObj.put(WebSocketConstants.DEVICE_NAME, iotDeviceInfo.getDeviceCode());
-            hostData.setHeaders(hostHeaderObj);
-
-            JSONObject hostPayloadObj = new JSONObject();
-            hostPayloadObj.put(WebSocketConstants.SERVICE, WebSocketConstants.GET_RECORD_INFOS_SERVICES);
             JSONObject object = new JSONObject();
-            hostPayloadObj.put(WebSocketConstants.ARGS, object);
             object.put("dvsCode", iotDeviceInfo.getHostCode());
             object.put("index", iotDeviceInfo.getDeviceCode());
-            object.put("recordDate", Arrays.asList(DateUtil.format(DateUtil.offsetDay(new Date(),-1),Constants.DAILY_FORMAT)));
-            hostData.setPayload(hostPayloadObj);
-            log.info("发送录像机录像信息请求:{}", JacksonUtils.toJSONString(iotWebsocketResult));
-            WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(iotWebsocketResult));
+            object.put("recordDate", Arrays.asList(DateUtil.format(DateUtil.offsetDay(new Date(),-1), Constants.DAILY_FORMAT)));
+            WebsocketResult websocketResult = IotServerUtils.invokeDownLinkServer(TopicTypeEnums.PRODUCT_SERVICE_INVOKE, iotDeviceInfo.getDeviceProduct(), iotDeviceInfo.getDeviceCode(), WebSocketConstants.GET_RECORD_INFOS_SERVICES, object);
+            log.info("调用录像机获取录像信息:{}", JacksonUtils.toJSONString(websocketResult));
+            WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
         }
 
     }

+ 39 - 18
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/dto/WebsocketPayloadResolve.java

@@ -4,6 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
+import com.xunmei.mediator.websocket.enums.TopicTypeEnums;
 import io.netty.util.internal.StringUtil;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -23,27 +24,47 @@ public class WebsocketPayloadResolve {
     private Object data;
 
 
-    public WebsocketPayloadResolve(WebsocketResult result) {
-        WebsocketPayloadResolve resolve = new WebsocketPayloadResolve();
-        if (result.getPayload() instanceof JSONObject) {
-            Map map = ((JSONObject) result.getPayload()).toJavaObject(Map.class);
-            //上报事件
-            String event = (String) map.get(WebSocketConstants.EVENT);
-            String service = (String) map.get(WebSocketConstants.SERVICE);
-            JSONObject header = (JSONObject) map.get(WebSocketConstants.HEADER);
-            String routingKey = ObjectUtil.isNotEmpty(event) ? event : service;
-            JSONObject args = (JSONObject) (ObjectUtil.isNotEmpty(map.get(WebSocketConstants.DATA)) ? map.get(WebSocketConstants.DATA) : map.get(WebSocketConstants.ARGS));
-            this.routingKey=routingKey;
-            this.header=header;
-            this.data=args;
+    public WebsocketPayloadResolve(WebsocketResult result, TopicTypeEnums typeEnums) {
+        boolean productEquals = WebSocketConstants.IOT_SERVER.equals(typeEnums.getProductName());
+        boolean deviceEquals = WebSocketConstants.IOT_SERVER_DEVICE.equals(typeEnums.getDeviceName());
+
+        if (productEquals && deviceEquals) {
+            // 说明是透穿的
+            JSONObject jsb = (JSONObject) result.getPayload();
+            JSONObject payloadObj = (JSONObject) jsb.get(WebSocketConstants.ARGS);
+            WebsocketResult javaObject = payloadObj.toJavaObject(WebsocketResult.class);
+            if (javaObject.getPayload() instanceof JSONObject) {
+                processJSONObjectPayload(result.getPayload());
+            } else if (javaObject.getPayload() instanceof JSONArray) {
+                processJSONArrayPayload(result.getPayload());
+            }
+            return;
         }
-        if (result.getPayload() instanceof JSONArray) {
-            final JSONArray jsonArray = (JSONArray) result.getPayload();
-            this.routingKey=StringUtil.EMPTY_STRING;
-            this.header=new JSONObject();
-            this.data=jsonArray;
+
+        if (result.getPayload() instanceof JSONObject) {
+            processJSONObjectPayload(result.getPayload());
+        } else if (result.getPayload() instanceof JSONArray) {
+            processJSONArrayPayload(result.getPayload());
         }
+    }
 
+    private void processJSONObjectPayload(Object payload) {
+        Map map = ((JSONObject) payload).toJavaObject(Map.class);
+        // 上报事件
+        String event = (String) map.get(WebSocketConstants.EVENT);
+        String service = (String) map.get(WebSocketConstants.SERVICE);
+        JSONObject header = (JSONObject) map.get(WebSocketConstants.HEADER);
+        String routingKey = ObjectUtil.isNotEmpty(event) ? event : service;
+        JSONObject args = (JSONObject) (ObjectUtil.isNotEmpty(map.get(WebSocketConstants.DATA)) ? map.get(WebSocketConstants.DATA) : map.get(WebSocketConstants.ARGS));
+        this.routingKey = routingKey;
+        this.header = header;
+        this.data = args;
+    }
 
+    private void processJSONArrayPayload(Object payload) {
+        final JSONArray jsonArray = (JSONArray) payload;
+        this.routingKey = StringUtil.EMPTY_STRING;
+        this.header = new JSONObject();
+        this.data = jsonArray;
     }
 }

+ 8 - 7
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -2,7 +2,6 @@ package com.xunmei.mediator.websocket.handler;
 
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
 import com.xunmei.common.core.enums.iot.IotServerConnectStatus;
 import com.xunmei.common.core.utils.JacksonUtils;
@@ -18,6 +17,7 @@ import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.service.RouterService;
 import com.xunmei.mediator.websocket.service.RouterServiceHandler;
 import com.xunmei.mediator.websocket.utils.WebSocketUtils;
+import com.xunmei.system.api.util.LogUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -147,9 +147,9 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         // 从WebSocket会话中获取登录用户信息
         String token = WebSocketSessionHolder.updateToken(session);
         String payload = message.getPayload();
-        log.info("接收到消息:{}", message.getPayload());
+        LogUtils.WEBSOCKET_MSG.info("接收到消息:{}", message.getPayload());
         if (payload.isEmpty()) {
-            log.error("消息内容为空,token:{}", token);
+            LogUtils.WEBSOCKET_MSG.error("消息内容为空,token:{}", token);
             return;
         }
         try {
@@ -157,15 +157,15 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
             Object obj = websocketResult.getPayload();
             String topic = websocketResult.getTopic();
             if (ObjectUtil.isEmpty(obj)) {
-                log.error("消息内容为空,topic:{}", topic);
+                LogUtils.WEBSOCKET_MSG.error("消息内容为空,topic:{}", topic);
                 return;
             }
             TopicTypeEnums typeEnums = TopicTypeEnums.matcherTopicTypeEnums(topic);
             if (typeEnums == null) {
-                log.error("消息topic错误,topic:{}", topic);
+                LogUtils.WEBSOCKET_MSG.error("消息topic错误,topic:{}", topic);
                 return;
             }
-            WebsocketPayloadResolve payloadResolve = new WebsocketPayloadResolve(websocketResult);
+            WebsocketPayloadResolve payloadResolve = new WebsocketPayloadResolve(websocketResult, typeEnums);
             //上报消息内容
             final Object args = payloadResolve.getData();
             WebsocketExecuteReq executeReq = new WebsocketExecuteReq(payloadResolve.getRoutingKey(), args, token, websocketResult.getId(), topic, payloadResolve.getHeader(), typeEnums.getProductName(), typeEnums.getDeviceName());
@@ -192,8 +192,9 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
                     break;
             }
             WebSocketUtils.sendMessage(session, result);
+            LogUtils.WEBSOCKET_MSG.info("中心平台返回消息,token:{},内容:{}", token, JacksonUtils.toJSONString(result));
         } catch (Exception e) {
-            log.error("转换消息内容时出错:{}", e);
+            LogUtils.WEBSOCKET_MSG.error("转换消息内容时出错:{}", e);
         }
 
 

+ 34 - 1
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/utils/IotServerUtils.java

@@ -4,12 +4,12 @@ import com.alibaba.fastjson.JSONObject;
 import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
+import com.xunmei.mediator.websocket.enums.TopicTypeEnums;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.UUID;
@@ -29,4 +29,37 @@ public class IotServerUtils {
         websocketResult.setPayload(hashMap);
         WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
     }
+
+    public static WebsocketResult invokeDownLinkServer(TopicTypeEnums topicTypeEnums,String produceName,String deviceName,String invokeMethod,JSONObject object){
+        WebsocketResult iotWebsocketResult = new WebsocketResult();
+        //Iot消息透穿至主机 固定Topic
+        iotWebsocketResult.setTopic(TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE, WebSocketConstants.IOT_SERVER,WebSocketConstants.IOT_SERVER_DEVICE));
+        iotWebsocketResult.setId(UUID.randomUUID().toString());
+        iotWebsocketResult.setTimestamp(new Date());
+        //iot消息头
+        JSONObject iotHeaderObj = new JSONObject();
+        iotHeaderObj.put(WebSocketConstants.PRODUCT_NAME,WebSocketConstants.IOT_SERVER);
+        iotHeaderObj.put(WebSocketConstants.DEVICE_NAME, WebSocketConstants.IOT_SERVER_DEVICE);
+        iotWebsocketResult.setHeaders(iotHeaderObj);
+        //iot消息体,完整的消息包含在payload中
+        JSONObject iotPayloadObj = new JSONObject();
+        iotWebsocketResult.setPayload(iotPayloadObj);
+        iotPayloadObj.put(WebSocketConstants.SERVICE, WebSocketConstants.DOWN_LINK_SERVICE_PASS_THROUGH);
+        //以下是主机消息内容
+        WebsocketResult hostData = new WebsocketResult();
+        iotPayloadObj.put(WebSocketConstants.ARGS,hostData );
+        hostData.setId(UUID.randomUUID().toString());
+        hostData.setTopic(TopicTypeEnums.formatUrl(topicTypeEnums, produceName,deviceName));
+        JSONObject hostHeaderObj = new JSONObject();
+        hostHeaderObj.put(WebSocketConstants.PRODUCT_NAME,produceName);
+        hostHeaderObj.put(WebSocketConstants.DEVICE_NAME, deviceName);
+        hostData.setHeaders(hostHeaderObj);
+
+        JSONObject hostPayloadObj = new JSONObject();
+        hostPayloadObj.put(WebSocketConstants.SERVICE, invokeMethod);
+        hostPayloadObj.put(WebSocketConstants.ARGS, object);
+
+        hostData.setPayload(hostPayloadObj);
+        return iotWebsocketResult;
+    }
 }

+ 21 - 0
soc-modules/soc-modules-mediator/src/main/resources/logback.xml

@@ -18,6 +18,23 @@
         </encoder>
     </appender>
 
+    <!-- websocket日志  -->
+    <appender name="websocketMsgLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <maxFileSize>${max.file.size}</maxFileSize>
+            <!--日志文件输出的文件名-->
+            <FileNamePattern>${LOG_HOME}/websocketMsg/%d{yyyy-MM-dd}-%i.log</FileNamePattern>
+            <!--日志文件保留天数-->
+            <maxHistory>${max.history}</maxHistory>
+            <totalSizeCap>${total.size.cap}</totalSizeCap>
+            <cleanHistoryOnStart>true</cleanHistoryOnStart>
+        </rollingPolicy>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
     <!-- equipmentList 基础信息,基础设备同步接口   -->
     <appender name="equipmentListLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
         <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
@@ -608,6 +625,10 @@
         <!--上面appender元素的name值。为了和logger的name属性做区分,我故意写的不一样-->
         <appender-ref ref="airConditionerStatusLog"/>
     </logger>
+    <logger name="websocketMsgLog" additivity="false" level="info">
+        <!--上面appender元素的name值。为了和logger的name属性做区分,我故意写的不一样-->
+        <appender-ref ref="websocketMsgLog"/>
+    </logger>
 
     <!-- 日志输出级别 -->
     <root level="INFO">