浏览代码

Websocket解析代码提交

jingyuanchao 1 年之前
父节点
当前提交
78e6f33523

+ 1 - 0
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/constant/Constants.java

@@ -158,6 +158,7 @@ public class Constants {
     public static final String DAILY_FORMAT_ZH = "yyyy年MM月dd日";
 
     public static final String HMS_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    public static final String UTC_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
 
     public static final String HM_FORMAT = "yyyyMMddHHmmss";
 

+ 14 - 3
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/dto/WebsocketResult.java

@@ -2,9 +2,11 @@ package com.xunmei.mediator.websocket.dto;
 
 import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSONObject;
+import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.constant.HttpStatus;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.system.api.dto.protection.ReceiveErrorDto;
+import io.netty.util.internal.StringUtil;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -54,15 +56,24 @@ public class WebsocketResult implements Serializable {
         return String.format("things/%s/%s/service/invoke", productName, deviceName);
     }
 
-    public void setTimestamp(Date date) {
-        this.timestamp = DateUtil.format(date, "yyyy-MM-dd'T'HH:mm:ss");
+    public static WebsocketResult of(String topic, String id) {
+        WebsocketResult result = new WebsocketResult();
+        result.setId(id);
+        result.setTopic(topic);
+        result.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
+        Map<String, Object> map = new HashMap<>();
+        map.put(WebSocketConstants.STATUS_CODE, HttpStatus.SUCCESS);
+        map.put(WebSocketConstants.STATUS_DESCRIPTION, StringUtil.EMPTY_STRING);
+        map.put(WebSocketConstants.HEADER, new JSONObject());
+        result.setPayload(map);
+        return result;
     }
 
     public static WebsocketResult of(ReceiveErrorDto dto, String topic, String id) {
         WebsocketResult result = new WebsocketResult();
         result.setId(id);
         result.setTopic(topic);
-        result.setTimestamp(new Date());
+        result.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
         if (dto == null) {
             return result;
         }

+ 3 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/enums/TopicTypeEnums.java

@@ -56,6 +56,9 @@ public enum TopicTypeEnums {
         TopicTypeEnums[] typeEnums = TopicTypeEnums.values();
         for (TopicTypeEnums regex : typeEnums) {
             Pattern pattern = Pattern.compile(regex.getTopic());
+            if (TopicTypeEnums.SYS_NOTICE.equals(regex)){
+                return TopicTypeEnums.SYS_NOTICE;
+            }
 
             // 创建Matcher对象
             Matcher matcher = pattern.matcher(topic);

+ 9 - 3
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -1,7 +1,9 @@
 package com.xunmei.mediator.websocket.handler;
 
+import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
+import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
 import com.xunmei.common.core.enums.iot.IotServerConnectStatus;
 import com.xunmei.common.core.utils.JacksonUtils;
@@ -79,7 +81,11 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
             final List<String> list = session.getHandshakeHeaders().get("x-forwarded-for");
             if (ObjectUtil.isNotEmpty(list)) {
                 ip = list.get(0);
-                LogUtils.WEBSOCKET_MSG.info("[建立连接],ip:{}", ip);
+                if (ip.contains(",")) {
+                    ip = ip.split(",")[0];
+                }
+                if (StringUtils.isNotEmpty(ip))
+                    LogUtils.WEBSOCKET_MSG.info("[建立连接],ip:{}", ip);
             }
             IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
             serverInfo.setIotIp(ip);
@@ -103,7 +109,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         result.setId(id);
         result.setTopic(topic);
         //当前时间转换为 格式:2024-07-02T14:17:32
-        result.setTimestamp(new Date());
+        result.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
         result.setPayload(object);
         return result;
     }
@@ -142,7 +148,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
             //上报消息内容
             final Object args = payloadResolve.getData();
             WebsocketExecuteReq executeReq = new WebsocketExecuteReq(payloadResolve.getRoutingKey(), args, token, websocketResult.getId(), topic, payloadResolve.getHeader(), typeEnums.getProductName(), typeEnums.getDeviceName());
-            WebsocketResult result = null;
+            WebsocketResult result = WebsocketResult.of(websocketResult.getTopic(), websocketResult.getId());
             switch (typeEnums) {
                 //系统通知
                 case SYS_NOTICE:

+ 11 - 12
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/impl/WebsocketServiceImpl.java

@@ -8,7 +8,7 @@ import com.xunmei.common.core.domain.iot.domain.IotAlarmDefenceArea;
 import com.xunmei.common.core.domain.iot.domain.IotAlarmSubsystem;
 import com.xunmei.common.core.domain.iot.domain.IotDvrDisk;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
-import com.xunmei.common.core.utils.JacksonUtils;
+import com.xunmei.common.core.enums.iot.DeviceTypeEnum;
 import com.xunmei.common.core.utils.StringUtils;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.mediator.api.protection.service.IotAlarmDefenceAreaService;
@@ -29,7 +29,6 @@ import com.xunmei.mediator.websocket.dto.dvs.ChannelInfo;
 import com.xunmei.mediator.websocket.dto.dvs.DiskInfo;
 import com.xunmei.mediator.websocket.dto.dvs.DvsBaseInfo;
 import com.xunmei.mediator.websocket.dto.dvs.DvsInfo;
-import com.xunmei.common.core.enums.iot.DeviceTypeEnum;
 import com.xunmei.mediator.websocket.enums.ProductEnums;
 import com.xunmei.mediator.websocket.redis.WebsocketPublisher;
 import com.xunmei.mediator.websocket.service.RouterService;
@@ -38,6 +37,7 @@ import com.xunmei.mediator.websocket.utils.IotServerUtils;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
 import com.xunmei.system.api.domain.iot.IotDeviceInfoExtend;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
+import com.xunmei.system.api.dto.protection.ReceiveErrorDto;
 import com.xunmei.system.api.util.LogUtils;
 import org.redisson.api.RKeys;
 import org.redisson.api.RLock;
@@ -213,7 +213,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
                 //逻辑删除视频设备
                 iIotDeviceInfoService.deleteDeviceByToken(token);
                 //逻辑删除Dvs扩展信息
-                iIotDeviceInfoExtendService.deleteDeviceExtendByTokenAndDeviceType(token,DeviceTypeEnum.DVS.getCode());
+                iIotDeviceInfoExtendService.deleteDeviceExtendByTokenAndDeviceType(token, DeviceTypeEnum.DVS.getCode());
                 //逻辑删除所有硬盘信息
                 iIotDvrDiskService.deleteByToken(token);
                 /**
@@ -313,7 +313,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
                 //逻辑删除子系统和防区
                 iotAlarmSubsystemService.deleteByIotToken(token);
                 iotAlarmDefenceAreaService.deleteByIotToken(token);
-                iIotDeviceInfoExtendService.deleteDeviceExtendByTokenAndDeviceType(token,DeviceTypeEnum.ALARM_HOST.getCode());
+                iIotDeviceInfoExtendService.deleteDeviceExtendByTokenAndDeviceType(token, DeviceTypeEnum.ALARM_HOST.getCode());
 
                 //添加报警主机扩展信息
                 List<IotDeviceInfoExtend> addExtendList = new ArrayList();
@@ -330,7 +330,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
                 //更新子系统集合
                 List<IotAlarmDefenceArea> updateDefenceAreaList = new ArrayList();
 
-                processAlarmHostExtend(alarmHostBaseInfo.getCommunicationParameters(),serverInfo, token, addExtendList, updateExtendList);
+                processAlarmHostExtend(alarmHostBaseInfo.getCommunicationParameters(), serverInfo, token, addExtendList, updateExtendList);
                 processSubSystem(alarmHostBaseInfo.getSubsystems(), serverInfo, token, addSubSystemList, updateSubSystemList);
                 processDefenceArea(alarmHostBaseInfo.getInputs(), serverInfo, token, addDefenceAreaList, updateDefenceAreaList);
 
@@ -748,13 +748,13 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         } else {
             for (CommunicationParameter parameter : communicationParameters) {
                 IotDeviceInfo info = iIotDeviceInfoService.selectByTypeAndCode(token, DeviceTypeEnum.ALARM_HOST.getCode(), parameter.getProductName(), parameter.getDeviceName());
-                if (info == null){
+                if (info == null) {
                     continue;
                 }
                 IotDeviceInfoExtend extend = iIotDeviceInfoExtendService.selectByProductDeviceNameAndToken(token, DeviceTypeEnum.ALARM_HOST.getCode(), parameter.getDeviceName());
 
                 if (extend == null) {
-                    extend = createAlarmHostInfoExtend(parameter, info.getId(),serverInfo);
+                    extend = createAlarmHostInfoExtend(parameter, info.getId(), serverInfo);
                     addExtendList.add(extend);
                 } else {
                     dealAlarmHostInfoExtend(parameter, extend);
@@ -764,7 +764,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         }
     }
 
-    private IotDeviceInfoExtend createAlarmHostInfoExtend(CommunicationParameter parameter, Long deviceId,IotServerInfo serverInfo) {
+    private IotDeviceInfoExtend createAlarmHostInfoExtend(CommunicationParameter parameter, Long deviceId, IotServerInfo serverInfo) {
         IotDeviceInfoExtend extend = new IotDeviceInfoExtend();
         extend.setDeviceId(deviceId);
         extend.setPort(parameter.getDevicePort());
@@ -789,7 +789,6 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
     }
 
 
-
     /**
      * 处理报警主机子系统数据
      *
@@ -912,7 +911,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         info.setUpdateTime(LocalDateTime.now());
     }
 
-    private void saveOrUpdateBatches(List<IotAlarmSubsystem> addSubSystemInfos, List<IotAlarmSubsystem> updateSubSystemInfos, List<IotAlarmDefenceArea> addDefenceAreaInfos, List<IotAlarmDefenceArea> updateDefenceAreaInfos,List<IotDeviceInfoExtend> addExtendList, List<IotDeviceInfoExtend> updateExtendList) {
+    private void saveOrUpdateBatches(List<IotAlarmSubsystem> addSubSystemInfos, List<IotAlarmSubsystem> updateSubSystemInfos, List<IotAlarmDefenceArea> addDefenceAreaInfos, List<IotAlarmDefenceArea> updateDefenceAreaInfos, List<IotDeviceInfoExtend> addExtendList, List<IotDeviceInfoExtend> updateExtendList) {
         if (!addSubSystemInfos.isEmpty()) {
             iotAlarmSubsystemService.saveSubSystemInfos(addSubSystemInfos);
         }
@@ -969,9 +968,9 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
                 dealDvsBaseInfo(dvsBaseInfo, req.getToken());
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            LogUtils.WEBSOCKET_MSG.error("设备基础数据处理错误:{}", e);
             throw new RuntimeException(e);
         }
-        return null;
+        return WebsocketResult.of(ReceiveErrorDto.success(), req.getTopic(), req.getId());
     }
 }

+ 5 - 3
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/utils/IotServerUtils.java

@@ -1,6 +1,8 @@
 package com.xunmei.mediator.websocket.utils;
 
+import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSONObject;
+import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
@@ -22,7 +24,7 @@ public class IotServerUtils {
     public static WebsocketResult invokeIotServer(String topic, String service, Object args){
         WebsocketResult websocketResult = new WebsocketResult();
         websocketResult.setId(UUID.randomUUID().toString());
-        websocketResult.setTimestamp(new Date());
+        websocketResult.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
         websocketResult.setTopic(topic);
         websocketResult.setHeaders(new JSONObject());
         HashMap<String, Object> hashMap = new HashMap<>();
@@ -40,7 +42,7 @@ public class IotServerUtils {
         //Iot消息透穿至主机 固定Topic
         iotWebsocketResult.setTopic(TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE, WebSocketConstants.IOT_SERVER,WebSocketConstants.IOT_SERVER_DEVICE));
         iotWebsocketResult.setId(id);
-        iotWebsocketResult.setTimestamp(date);
+        iotWebsocketResult.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
         //iot消息头
         JSONObject iotHeaderObj = new JSONObject();
         iotHeaderObj.put(WebSocketConstants.PRODUCT_NAME,WebSocketConstants.IOT_SERVER);
@@ -54,7 +56,7 @@ public class IotServerUtils {
         WebsocketResult hostData = new WebsocketResult();
         iotPayloadObj.put(WebSocketConstants.ARGS,hostData );
         hostData.setId(id);
-        hostData.setTimestamp(date);
+        hostData.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
         hostData.setTopic(TopicTypeEnums.formatUrl(topicTypeEnums, produceName,deviceName));
         JSONObject hostHeaderObj = new JSONObject();
         hostHeaderObj.put(WebSocketConstants.PRODUCT_NAME,produceName);