Jelajahi Sumber

websocket转发修改提交

jingyuanchao 1 tahun lalu
induk
melakukan
b59cfad15e

+ 14 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/video/service/impl/IotDvrHardDiskDetectionServiceImpl.java

@@ -13,6 +13,7 @@ import com.xunmei.common.core.utils.DateUtils;
 import com.xunmei.common.core.utils.IDHelper;
 import com.xunmei.mediator.api.alarm.service.IotAlarmDataService;
 import com.xunmei.mediator.api.host.service.IIotDeviceService;
+import com.xunmei.mediator.api.server.service.IotServerInfoService;
 import com.xunmei.mediator.api.video.mapper.IotDvrHardDiskDetectionMapper;
 import com.xunmei.mediator.api.video.service.IotDvrDiskService;
 import com.xunmei.mediator.api.video.service.IotDvrHardDiskDetectionLogService;
@@ -21,6 +22,7 @@ import com.xunmei.mediator.domain.dto.disk.VideoRecorderHardDiskDetectionReq;
 import com.xunmei.mediator.util.CheckDataUtil;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
+import com.xunmei.mediator.websocket.enums.ProductEnums;
 import com.xunmei.mediator.websocket.service.RouterService;
 import com.xunmei.system.api.RemoteOrgService;
 import com.xunmei.system.api.domain.SysOrg;
@@ -60,6 +62,14 @@ public class IotDvrHardDiskDetectionServiceImpl extends ServiceImpl<IotDvrHardDi
     @Autowired
     private IotAlarmDataService iotAlarmDataService;
 
+    @Resource
+    IotServerInfoService iotServerInfoService;
+
+    @Override
+    public ProductEnums product() {
+        return ProductEnums.DVS;
+    }
+
     @Override
     public String routerKey() {
         StringJoiner sj = new StringJoiner(",");
@@ -77,6 +87,10 @@ public class IotDvrHardDiskDetectionServiceImpl extends ServiceImpl<IotDvrHardDi
             dto.setCheckTime(recorderHardDiskDetectionReq.getCheckTime());
             dto.setDetailInfo(recorderHardDiskDetectionReq.getDetailInfo());
 
+            String token = req.getToken();
+            SysOrg sysOrg = iotServerInfoService.selectOrgByToken(token);
+            dto.setOrganizationGuid(sysOrg.getCode());
+
             return saveData(dto, "");
         } catch (IllegalAccessException e) {
             throw new RuntimeException(e);

+ 5 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/video/service/impl/VideoDiagnosisRecordServiceImpl.java

@@ -24,6 +24,7 @@ import com.xunmei.mediator.util.CheckDataUtil;
 import com.xunmei.mediator.util.RedisUtil;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
+import com.xunmei.mediator.websocket.enums.ProductEnums;
 import com.xunmei.mediator.websocket.service.RouterService;
 import com.xunmei.system.api.RemoteOrgService;
 import com.xunmei.system.api.domain.SysOrg;
@@ -65,6 +66,10 @@ public class VideoDiagnosisRecordServiceImpl extends ServiceImpl<VideoDiagnosisR
     @Autowired
     private IotAlarmDataService iotAlarmDataService;
 
+    @Override
+    public ProductEnums product() {
+        return ProductEnums.DVS;
+    }
 
     @Override
     public String routerKey() {

+ 7 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/video/service/impl/VideoIntegrityCheckServiceImpl.java

@@ -30,6 +30,7 @@ import com.xunmei.mediator.util.RedisUtil;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
+import com.xunmei.mediator.websocket.enums.ProductEnums;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.service.RouterService;
 import com.xunmei.system.api.RemoteOrgService;
@@ -80,6 +81,12 @@ public class VideoIntegrityCheckServiceImpl extends ServiceImpl<VideoIntegrityCh
     @Resource
     IotServerInfoService iotServerInfoService;
 
+
+    @Override
+    public ProductEnums product() {
+        return ProductEnums.DVS;
+    }
+
     @Override
     public String routerKey() {
         StringJoiner result = new StringJoiner(",");

+ 32 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/enums/ProductEnums.java

@@ -0,0 +1,32 @@
+package com.xunmei.mediator.websocket.enums;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.Arrays;
+
+@Getter
+@AllArgsConstructor
+public enum ProductEnums {
+
+
+    DVS(new String[]{"VGSII_Hik"}),
+    ALARM_HOST(new String[]{"InAnter_BM1600NTSmall", "Hik_DS19A", "HikModule", "FengYe_H402", "HengTong_CKWU01C", "MtaOCX", "CrossProcessDemo", "HoneywellOCX_IPM", "DaHuaAlarmHost", "BOSCH_CMS"}),
+
+
+    ;
+
+    private String[] productName;
+
+
+    public static ProductEnums getByProductName(String productName) {
+        for (ProductEnums productEnums : ProductEnums.values()) {
+            if (Arrays.asList(productEnums.getProductName()).contains(productName)){
+                return productEnums;
+            }
+        }
+        return null;
+    }
+
+}

+ 64 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/enums/TopicTypeEnums.java

@@ -0,0 +1,64 @@
+package com.xunmei.mediator.websocket.enums;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Getter
+
+@AllArgsConstructor
+public enum TopicTypeEnums {
+
+
+    //系统通知
+    SYS_NOTICE("/sys/notification","",""),
+    //设备状态通知
+    DEVICE_STATUS("^/device/status/(\\w+)/(\\w+)$","",""),
+    //产品事件通知消息
+    PRODUCT_EVENT_NOTICE("^/things/(\\w+)/(\\w+)/event/post$","",""),
+    //应用方调用IoT产品服务消息
+    PRODUCT_SERVICE_INVOKE("^/things/(\\w+)/(\\w+)/service/invoke$","",""),
+    //IoT返回服务调用消息
+    PRODUCT_SERVICE_REPLY("^/things/(\\w+)/(\\w+)/service/invoke/reply$","",""),
+    //应用方读取属性
+    PRODUCT_PROPERTY_GET("^/things/(\\w+)/(\\w+)/property/get$","",""),
+    //IoT返回属性
+    PRODUCT_PROPERTY_REPLY("^/things/(\\w+)/(\\w+)/property/get/reply$","",""),
+    //应用方设置属性
+    PRODUCT_PROPERTY_SET("^/things/((\\w+)/(\\w+)/property/set$","",""),
+
+    ;
+
+    private final String topic;
+    @Setter
+    private String productName;
+    @Setter
+    private String deviceName;
+
+
+
+
+    public static TopicTypeEnums matcherTopicTypeEnums(String topic) {
+        TopicTypeEnums[] typeEnums = TopicTypeEnums.values();
+        for (TopicTypeEnums regex : typeEnums) {
+            Pattern pattern = Pattern.compile(regex.getTopic());
+            // 创建Matcher对象
+            Matcher matcher = pattern.matcher(topic);
+            // 检查是否匹配成功
+            if (matcher.find()) {
+                String productName = matcher.group(1);
+                String deviceName = matcher.group(2);
+                regex.setProductName(productName);
+                regex.setDeviceName(deviceName);
+                return regex;
+            }
+        }
+
+        return null;
+    }
+
+}

+ 59 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -11,6 +11,7 @@ import com.xunmei.mediator.api.server.service.IotServerInfoService;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
+import com.xunmei.mediator.websocket.enums.TopicTypeEnums;
 import com.xunmei.mediator.websocket.enums.WebsocketStatus;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.service.RouterService;
@@ -138,6 +139,64 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
 
     }
 
+    public void handleTextMessage1(WebSocketSession session, TextMessage message) throws Exception {
+        // 从WebSocket会话中获取登录用户信息
+        WebSocketSessionHolder.updateToken(session);
+        String payload = message.getPayload();
+        log.info("接收到消息:{}",message.getPayload());
+
+        if (payload.isEmpty()){
+            return;
+        }
+        try {
+            WebsocketResult websocketResult = JSON.parseObject(payload, WebsocketResult.class);
+            Object obj = websocketResult.getPayload();
+            String topic = websocketResult.getTopic();
+            if (ObjectUtil.isNotEmpty(obj)){
+                if (ObjectUtil.isNotEmpty(obj)){
+
+                    TopicTypeEnums typeEnums = TopicTypeEnums.matcherTopicTypeEnums(topic);
+                    if (typeEnums == null){
+                        log.error("消息topic错误:{}",topic);
+                    }
+                Map map = JSON.parseObject(obj.toString(), Map.class);
+                //上报事件
+                String event = (String) map.get(WebSocketConstants.EVENT);
+                String service = (String) map.get(WebSocketConstants.SERVICE);
+                String trigger = (String) map.get(WebSocketConstants.TRIGGER);
+                String token = WebSocketSessionHolder.getSessionKey(session);
+                String routingKey=ObjectUtil.isNotEmpty(event) ? event : service;
+                //上报消息内容
+                JSONObject args = (JSONObject) map.get(WebSocketConstants.ARGS);
+                if (ObjectUtil.isEmpty(event)||args.isEmpty()){
+                    log.error("消息内容为空:{}",message.getPayload());
+                    return;
+                }
+                    switch (typeEnums) {
+                        case SYS_NOTICE:
+                            break;
+                        case DEVICE_STATUS:
+                            break;
+                        case PRODUCT_EVENT_NOTICE:
+                            break;
+                        case PRODUCT_SERVICE_REPLY:
+                            break;
+                        case PRODUCT_PROPERTY_REPLY:
+                            break;
+                        default:
+                            break;
+                    }
+                RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(),routingKey);
+                routeService.execute(new WebsocketExecuteReq(event, args,token));
+                //todo 返回iot消息
+            }}
+        } catch (Exception e) {
+            log.error("转换消息内容时出错:{}",e);
+        }
+
+
+    }
+
     /**
      * 处理接收到的二进制消息
      *

+ 2 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/RouterService.java

@@ -1,9 +1,11 @@
 package com.xunmei.mediator.websocket.service;
 
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
+import com.xunmei.mediator.websocket.enums.ProductEnums;
 
 public interface RouterService {
 
+    ProductEnums product();
     String routerKey();
 
     Object execute(WebsocketExecuteReq req);

+ 21 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/RouterServiceHandler.java

@@ -1,15 +1,22 @@
 package com.xunmei.mediator.websocket.service;
 
 import cn.hutool.extra.spring.SpringUtil;
+import com.xunmei.mediator.websocket.enums.ProductEnums;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 
 public class RouterServiceHandler {
     private static final Map<String, RouterService> CONCURRENT_HASH_MAP = new ConcurrentHashMap<>();
+    private static final Map<ProductEnums, List<RouterService>> PRODUCT_HASH_MAP =  new ConcurrentHashMap<>();
 
     static {
+        Map<String, RouterService> map = SpringUtil.getBeansOfType(RouterService.class);
+        Map<ProductEnums, List<RouterService>> productEnumsListMap = map.values().stream().collect(Collectors.groupingBy(RouterService::product));
+        PRODUCT_HASH_MAP.putAll(productEnumsListMap);
 
         SpringUtil.getBeansOfType(RouterService.class).forEach((k, v) -> {
             CONCURRENT_HASH_MAP.put(v.routerKey(), v);
@@ -24,4 +31,18 @@ public class RouterServiceHandler {
         }
         throw new RuntimeException("未找到对应的处理类");
     }
+
+    public static RouterService getRouteService(String productName,String routerKey) {
+        ProductEnums enums = ProductEnums.getByProductName(productName);
+        if (enums == null){
+            throw new RuntimeException("未找到对应的产品类型");
+        }
+
+        return PRODUCT_HASH_MAP.get(enums)
+                .stream()
+                .filter(routerService -> routerService.routerKey().equals(routerKey))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("未找到对应的处理类"));
+
+    }
 }

+ 1 - 1
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/utils/WebSocketUtils.java

@@ -135,7 +135,7 @@ public class WebSocketUtils {
      * @param url
      * @return
      */
-    private static WebsocketUrlInfo resolveUrlInfo(String url) {
+    public static WebsocketUrlInfo resolveUrlInfo(String url) {
         // 正则表达式模式
         Pattern pattern = Pattern.compile("^/things/(\\w+)/(\\w+)/service/invoke$");
         // 创建Matcher对象