Explorar o código

处理集群情况下,处理消息不可靠的情况

jingyuanchao hai 1 ano
pai
achega
a725109b40

+ 5 - 4
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -38,9 +38,9 @@ import com.xunmei.host.websocket.enums.DeviceCacheEnum;
 import com.xunmei.host.websocket.enums.DeviceNetStatusEnum;
 import com.xunmei.host.websocket.enums.DeviceNetStatusEnum;
 import com.xunmei.host.websocket.enums.ProductEnums;
 import com.xunmei.host.websocket.enums.ProductEnums;
 import com.xunmei.host.websocket.enums.TopicTypeEnums;
 import com.xunmei.host.websocket.enums.TopicTypeEnums;
+import com.xunmei.host.websocket.redis.WebsocketPublisher;
 import com.xunmei.host.websocket.service.RouterService;
 import com.xunmei.host.websocket.service.RouterService;
 import com.xunmei.host.websocket.utils.IotServerUtils;
 import com.xunmei.host.websocket.utils.IotServerUtils;
-import com.xunmei.host.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
 import com.xunmei.system.api.domain.iot.IotDeviceInfoExtend;
 import com.xunmei.system.api.domain.iot.IotDeviceInfoExtend;
 import com.xunmei.system.api.util.LogUtils;
 import com.xunmei.system.api.util.LogUtils;
@@ -81,6 +81,8 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
     @Lazy
     @Lazy
     @Autowired
     @Autowired
     private IotDeviceStatusService deviceStatusService;
     private IotDeviceStatusService deviceStatusService;
+    @Autowired
+    private WebsocketPublisher websocketPublisher;
 
 
     /**
     /**
      * 根据设备类型和设备编码,获取设备信息
      * 根据设备类型和设备编码,获取设备信息
@@ -830,7 +832,7 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         final JSONObject object = new JSONObject();
         final JSONObject object = new JSONObject();
         object.put("dataType", eto.getDataType());
         object.put("dataType", eto.getDataType());
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.DO_WORK, object);
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.DO_WORK, object);
-        WebSocketUtils.sendMessage(iotCode, JacksonUtils.toJSONString(result));
+        websocketPublisher.publishMsg(iotCode, result);
 
 
     }
     }
 
 
@@ -858,7 +860,6 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         }
         }
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.CHANGE_DEVICE, eto);
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.CHANGE_DEVICE, eto);
-        WebSocketUtils.sendMessage(iotCode, JacksonUtils.toJSONString(result));
-
+        websocketPublisher.publishMsg(iotCode, result);
     }
     }
 }
 }

+ 4 - 8
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/controller/HostController.java

@@ -1,8 +1,6 @@
 package com.xunmei.host.north.controller;
 package com.xunmei.host.north.controller;
 
 
 
 
-import com.xunmei.common.core.domain.host.remote.DiskInfoGetReq;
-import com.xunmei.common.core.domain.host.remote.VideoIntegrityGetReq;
 import com.xunmei.common.core.domain.work.dto.ControlDeviceDto;
 import com.xunmei.common.core.domain.work.dto.ControlDeviceDto;
 import com.xunmei.common.core.domain.work.dto.WorkDayDto;
 import com.xunmei.common.core.domain.work.dto.WorkDayDto;
 import com.xunmei.common.core.web.domain.AjaxResult;
 import com.xunmei.common.core.web.domain.AjaxResult;
@@ -11,10 +9,7 @@ import com.xunmei.host.iot.service.IIotDeviceInfoService;
 import com.xunmei.host.protection.service.IIotAlarmHostService;
 import com.xunmei.host.protection.service.IIotAlarmHostService;
 import com.xunmei.host.video.service.IVideoIntegrityCheckService;
 import com.xunmei.host.video.service.IVideoIntegrityCheckService;
 import com.xunmei.host.video.service.IotDvrHardDiskDetectionService;
 import com.xunmei.host.video.service.IotDvrHardDiskDetectionService;
-import com.xunmei.host.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.host.websocket.service.WebsocketService;
-import com.xunmei.system.api.Eto.BreakerControlEto;
-import com.xunmei.system.api.Eto.SubSystemControlEto;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiOperation;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.bind.annotation.*;
 
 
@@ -56,7 +51,7 @@ public class HostController {
         return AjaxResult.success();
         return AjaxResult.success();
     }
     }
 
 
-    @ApiOperation("主动获取指定通道的录像完整性数据")
+    /*@ApiOperation("主动获取指定通道的录像完整性数据")
     @GetMapping({"/getRecordInfo"})
     @GetMapping({"/getRecordInfo"})
     public AjaxResult videoIntegrityCheck(@RequestBody VideoIntegrityGetReq req) {
     public AjaxResult videoIntegrityCheck(@RequestBody VideoIntegrityGetReq req) {
         this.videoIntegrityCheckService.getRecordInfo(req);
         this.videoIntegrityCheckService.getRecordInfo(req);
@@ -102,12 +97,13 @@ public class HostController {
         } catch (Exception r) {
         } catch (Exception r) {
             return AjaxResult.error("通断电控制指令下发失败!");
             return AjaxResult.error("通断电控制指令下发失败!");
         }
         }
-    }
+    }*/
 
 
     @ApiOperation("关闭iot链接")
     @ApiOperation("关闭iot链接")
     @GetMapping({"/closeSession"})
     @GetMapping({"/closeSession"})
     public AjaxResult closeSession(String iotCode) throws IOException {
     public AjaxResult closeSession(String iotCode) throws IOException {
-        WebSocketSessionHolder.closeServe(iotCode);
+        //集群环境下 此功能不稳地,暂时取消
+        //WebSocketSessionHolder.closeServe(iotCode);
         return AjaxResult.success();
         return AjaxResult.success();
     }
     }
 
 

+ 1 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java

@@ -178,7 +178,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
             }
             }
             IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
             IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
             if (ObjectUtil.isEmpty(serverInfo)) {
             if (ObjectUtil.isEmpty(serverInfo)) {
-                LogUtils.WEBSOCKET_MSG.error("未查询到token为 {} 的服务器信息", token);
+                LogUtils.WEBSOCKET_MSG.error("服务器信息不存在或链接信息已不可用,token:{}", token);
                 return;
                 return;
             }
             }
             //上报消息内容
             //上报消息内容

+ 19 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketPublisher.java

@@ -1,5 +1,6 @@
 package com.xunmei.host.websocket.redis;
 package com.xunmei.host.websocket.redis;
 
 
+import com.google.common.collect.Sets;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
@@ -8,6 +9,8 @@ import org.redisson.api.RedissonClient;
 import org.redisson.codec.SerializationCodec;
 import org.redisson.codec.SerializationCodec;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
+import java.util.HashSet;
+
 /**
 /**
  * websocket 消息发布者
  * websocket 消息发布者
  * @author gaoxiong
  * @author gaoxiong
@@ -37,4 +40,20 @@ public class WebsocketPublisher {
         }
         }
 
 
     }
     }
+
+
+    //广播的消息
+    public void publishAllMsg(Object obj){
+        RedisWebsocketMsg message =new RedisWebsocketMsg();
+        message.setContent(obj);
+        message.setTokens(new HashSet<>());
+        sendMessage(message);
+    }
+    //指定主机的消息
+    public void publishMsg(String iotCode,Object obj){
+        RedisWebsocketMsg message =new RedisWebsocketMsg();
+        message.setContent(obj);
+        message.setTokens(Sets.newHashSet(iotCode));
+        sendMessage(message);
+    }
 }
 }

+ 12 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java

@@ -15,8 +15,10 @@ import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.Ordered;
 import org.springframework.core.Ordered;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
 import org.springframework.web.socket.WebSocketSession;
 
 
+import java.io.IOException;
 import java.util.Set;
 import java.util.Set;
 
 
 /**
 /**
@@ -53,6 +55,16 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
                         }
                         }
 
 
                     }
                     }
+                }else {
+                    for (String token : WebSocketSessionHolder.getSessionsAll()) {
+                        final WebSocketSession sessions = WebSocketSessionHolder.getSessions(token);
+                        try {
+                            sessions.sendMessage(new TextMessage(str));
+                        } catch (IOException e) {
+                            log.error("消息广播失败:{}",e);
+                            throw new RuntimeException(e);
+                        }
+                    }
                 }
                 }
 
 
             }
             }

+ 1 - 3
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java

@@ -1,7 +1,6 @@
 package com.xunmei.host.websocket.service.impl;
 package com.xunmei.host.websocket.service.impl;
 
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
-import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.host.iot.service.IIotDeviceInfoExtendService;
 import com.xunmei.host.iot.service.IIotDeviceInfoExtendService;
 import com.xunmei.host.iot.service.IIotDeviceInfoService;
 import com.xunmei.host.iot.service.IIotDeviceInfoService;
@@ -15,7 +14,6 @@ import com.xunmei.host.websocket.enums.TopicTypeEnums;
 import com.xunmei.host.websocket.redis.WebsocketPublisher;
 import com.xunmei.host.websocket.redis.WebsocketPublisher;
 import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.host.websocket.utils.IotServerUtils;
 import com.xunmei.host.websocket.utils.IotServerUtils;
-import com.xunmei.host.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import org.redisson.api.RKeys;
 import org.redisson.api.RKeys;
 import org.redisson.api.RedissonClient;
 import org.redisson.api.RedissonClient;
@@ -61,7 +59,7 @@ public class WebsocketServiceImpl implements WebsocketService {
     public void getDevices() {
     public void getDevices() {
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(),WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(),WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.GET_DEVICES_SERVICES, new JSONObject());
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.GET_DEVICES_SERVICES, new JSONObject());
-        WebSocketUtils.sendAll(JacksonUtils.toJSONString(result));
+        websocketPublisher.publishAllMsg(result);
     }
     }