瀏覽代碼

增加WS链接状态定时检测

jingyuanchao 10 月之前
父節點
當前提交
f754704461

+ 15 - 0
project_data/sql/1.0.11/soc/soc.sql

@@ -366,4 +366,19 @@ VALUES ('app token过期时间(单位:分钟)', 'TOKEN_APP_EXPIRETIME', '480
 
 ALTER TABLE `iot_alarm_subsystem_log`
     MODIFY COLUMN `org_path` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '0' COMMENT '机构' AFTER `alarm_host_code`;
+
+UPDATE `iot_server_product` SET `display_name` = 'Hik' WHERE `id` = '19ed86f7-780d-29ec-4bc7-3a13a7097ea1';
+UPDATE `iot_server_product` SET `display_name` = 'DaHua_V1' WHERE `id` = '783bb246-6ebe-7e69-e055-3a14adfe10c9';
+UPDATE `iot_server_product` SET `display_name` = 'DaHua' WHERE `id` = '92467897-92ef-647f-e55e-3a13b7f3ca9a';
+
+
+
+
+
+
+
+
+
+
+
 -- 升级脚本执行完成

+ 6 - 0
soc-api/soc-api-system/src/main/java/com/xunmei/system/api/util/LogUtils.java

@@ -200,4 +200,10 @@ public class LogUtils {
      * websocket dvs录像计划和录像模板
      */
     public  static  final Logger SOCKET_DVS_TEMPLATE_INFO=LoggerFactory.getLogger("socketDvsTemplateLog");
+
+
+    /**
+     * 检查websocket链接状态
+     */
+    public  static  final Logger SOCKET_CONNECT_STATUS_CHECK=LoggerFactory.getLogger("socketConnectStatusCheck");
 }

+ 0 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -994,7 +994,6 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         info.setDeviceCode(baseInfo.getDeviceCode());
         info.setDeviceProduct(baseInfo.getProductName());
         info.setDeviceType(type);
-        info.setNetStatus("0");
         info.setSyncStatus(DeviceSyncStatus.SYNC_SUCCESS.getCode());
         info.setDeviceName(baseInfo.getDeviceName());
         info.setUpdateTime(new Date());

+ 14 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/controller/HostController.java

@@ -165,4 +165,18 @@ public class HostController {
         }
         return AjaxResult.success(list);
     }
+
+    @GetMapping("/getOnlineWs")
+    AjaxResult getOnlineWs() {
+        Set<String> set = WebSocketSessionHolder.getSessionsAll();
+        List<String> list = new ArrayList<>();
+        for (String token : set) {
+            WebSocketSession sessions = WebSocketSessionHolder.getSessions(token);
+            if (sessions != null && sessions.isOpen()) {
+                list.add(token);
+            }
+        }
+        return AjaxResult.success(list);
+    }
+
 }

+ 28 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/server/controller/IotSeverController.java

@@ -0,0 +1,28 @@
+package com.xunmei.host.server.controller;
+
+import com.xunmei.common.core.web.domain.AjaxResult;
+import com.xunmei.host.server.service.IotServerInfoService;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+/**
+ * @author jingyuanchao
+ * @date 2024/12/12 14:19
+ */
+@RestController
+public class IotSeverController {
+
+    @Resource
+    private IotServerInfoService serverInfoService;
+
+
+    @ApiOperation("检查ws链接是否在线")
+    @GetMapping({"/checkWsOnLine"})
+    public AjaxResult checkWsOnLine() {
+        serverInfoService.checkWsOnLine();
+        return AjaxResult.success();
+    }
+}

+ 4 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/server/service/IotServerInfoService.java

@@ -32,4 +32,8 @@ public interface IotServerInfoService extends IService<IotServerInfo> {
     void updateConnectTimeByToken(String token, Date date);
 
     List<IotServerInfo> selectOnlineServer(List<Long> orgIds);
+
+    void checkWsOnLine();
+
+
 }

+ 103 - 6
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/server/service/impl/IotServerInfoServiceImpl.java

@@ -1,12 +1,19 @@
 package com.xunmei.host.server.service.impl;
 
+import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.util.ObjectUtil;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.pojo.Instance;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.xunmei.common.core.constant.Constants;
+import com.xunmei.common.core.constant.ServiceNameConstants;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
 import com.xunmei.common.core.enums.iot.IotServerConnectStatus;
+import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.core.utils.uuid.UUID;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.host.server.mapper.IotServerInfoMapper;
@@ -17,10 +24,15 @@ import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
 
+import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * <p>
@@ -33,6 +45,11 @@ import java.util.concurrent.TimeUnit;
 @Service
 public class IotServerInfoServiceImpl extends ServiceImpl<IotServerInfoMapper, IotServerInfo> implements IotServerInfoService {
 
+    @Resource
+    private NamingService namingService;
+
+    @Resource
+    private RestTemplate restTemplate;
 
     @Override
     public IotServerInfo getToken(String registerCode) {
@@ -58,7 +75,7 @@ public class IotServerInfoServiceImpl extends ServiceImpl<IotServerInfoMapper, I
                 }
 
                 updateById(serverInfo);
-                LogUtils.WEBSOCKET_MSG.error("收到链接请求,已生成token,registerCode:{},token:{}", registerCode,serverInfo.getIotCode());
+                LogUtils.WEBSOCKET_MSG.error("收到链接请求,已生成token,registerCode:{},token:{}", registerCode, serverInfo.getIotCode());
                 return serverInfo;
             }
         } catch (Exception e) {
@@ -113,13 +130,93 @@ public class IotServerInfoServiceImpl extends ServiceImpl<IotServerInfoMapper, I
 
     @Override
     public List<IotServerInfo> selectOnlineServer(List<Long> orgIds) {
-        LambdaQueryWrapper<IotServerInfo> wrapper=new LambdaQueryWrapper<IotServerInfo>();
+        LambdaQueryWrapper<IotServerInfo> wrapper = new LambdaQueryWrapper<IotServerInfo>();
 
-        wrapper.eq(IotServerInfo::getEnable,0);
-        wrapper.eq(IotServerInfo::getRegisterCodeStatus,1);
+        wrapper.eq(IotServerInfo::getEnable, 0);
+        wrapper.eq(IotServerInfo::getRegisterCodeStatus, 1);
         wrapper.eq(IotServerInfo::getIotStatus, IotServerConnectStatus.CONNECTED.getIdx());
-        wrapper.in(ObjectUtil.isNotEmpty(orgIds),IotServerInfo::getOrgId);
-        wrapper.select(IotServerInfo::getIotCode,IotServerInfo::getOrgId);
+        wrapper.in(ObjectUtil.isNotEmpty(orgIds), IotServerInfo::getOrgId);
+        wrapper.select(IotServerInfo::getIotCode, IotServerInfo::getOrgId);
         return baseMapper.selectList(wrapper);
     }
+
+    @Override
+    @Transactional
+    public void checkWsOnLine() {
+        List<Instance> instanceList = new ArrayList<>();
+        try {
+            instanceList = namingService.getAllInstances(ServiceNameConstants.HOST);
+        } catch (NacosException e) {
+            LogUtils.SOCKET_CONNECT_STATUS_CHECK.error("获取实例信息时,nacos服务异常", e);
+            return;
+        }
+        if (CollectionUtil.isEmpty(instanceList)) {
+            return;
+        }
+        try {
+            final List<String> onlineWs = getOnlineWs(instanceList);
+            updateWsOnlineStatus(onlineWs);
+        } catch (RuntimeException e) {
+            LogUtils.SOCKET_CONNECT_STATUS_CHECK.error("获取服务上WS链接状态时异常", e);
+            throw new RuntimeException(e);
+        }
+
+
+    }
+
+    private List<String> getOnlineWs(List<Instance> instanceList) throws RuntimeException {
+        //多线程远程调用获取服务状态
+        List<String> onlineWS = new ArrayList<>();
+        for (Instance instance : instanceList) {
+            List<String> list = null;
+            try {
+                list = restTemplate.getForObject(instance.getIp() + ":" + instance.getPort() + "/getOnlineWs", List.class);
+                LogUtils.SOCKET_CONNECT_STATUS_CHECK.error("获取当前系统在线的WS链接:{}", JacksonUtils.toJSONString(list));
+            } catch (RestClientException e) {
+                throw new RuntimeException(e);
+            }
+            if (list == null) {
+                continue;
+            }
+            onlineWS.addAll(list);
+        }
+        return onlineWS;
+    }
+
+
+    private void updateWsOnlineStatus(List<String> onlineIotCode) {
+        if (ObjectUtil.isEmpty(onlineIotCode)) {
+            final LambdaUpdateWrapper<IotServerInfo> wrapper = new LambdaUpdateWrapper<IotServerInfo>();
+            wrapper.set(IotServerInfo::getIotStatus, IotServerConnectStatus.OFFLINE.getIdx());
+            wrapper.set(IotServerInfo::getUpdateTime, new Date());
+            wrapper.set(IotServerInfo::getLastConnectTime, new Date());
+            baseMapper.update(null, wrapper);
+            LogUtils.SOCKET_CONNECT_STATUS_CHECK.info("检测WS链接状态时,发现没有在线的链接,修改所有链接为离线");
+            return;
+        }
+        //获取当前在线的WS
+        final List<IotServerInfo> infoList = selectOnlineServer(null);
+        //获取唯一标识
+        final List<String> collect = infoList.stream().map(IotServerInfo::getIotCode).collect(Collectors.toList());
+        //如果数据库中标识为当前在线的WS链接 并不存在于从所有实例汇总到的在线的WS链接中 则更新为离线
+        final List<String> offlineIotCode = collect.stream().filter(item -> !onlineIotCode.contains(item)).collect(Collectors.toList());
+        if (ObjectUtil.isNotEmpty(offlineIotCode)) {
+            final LambdaUpdateWrapper<IotServerInfo> wrapper = new LambdaUpdateWrapper<IotServerInfo>();
+            wrapper.in(IotServerInfo::getIotCode, offlineIotCode);
+            wrapper.set(IotServerInfo::getIotStatus, IotServerConnectStatus.OFFLINE.getIdx());
+            wrapper.set(IotServerInfo::getUpdateTime, new Date());
+            wrapper.set(IotServerInfo::getLastConnectTime, new Date());
+            baseMapper.update(null, wrapper);
+            LogUtils.SOCKET_CONNECT_STATUS_CHECK.info("发现存在数据库与实际链接状态不一致的WS链接,已修改为离线:{}", offlineIotCode);
+        }
+
+        //将从所有实例汇总到的在线的WS链接更新检测时间
+        final LambdaUpdateWrapper<IotServerInfo> wrapper = new LambdaUpdateWrapper<IotServerInfo>();
+        wrapper.set(IotServerInfo::getIotStatus, IotServerConnectStatus.CONNECTED.getIdx());
+        wrapper.set(IotServerInfo::getUpdateTime, new Date());
+        wrapper.set(IotServerInfo::getLastConnectTime, new Date());
+        wrapper.in(IotServerInfo::getIotCode, onlineIotCode);
+        baseMapper.update(null, wrapper);
+        LogUtils.SOCKET_CONNECT_STATUS_CHECK.info("修改在线的WS连接的检测时间:{}", JacksonUtils.toJSONString(onlineIotCode));
+    }
 }

+ 3 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java

@@ -158,7 +158,9 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         // 从WebSocket会话中获取登录用户信息
         String token = WebSocketSessionHolder.updateToken(session);
         if (StringUtils.isEmpty(token)) {
-            logErrorAndReturn("会话消息不存在", session);
+            session.close();
+            //走到这里说明ws当前处于链接状态,但是服务器内存中丢失了ws链接对象,需要重新链接
+            LogUtils.WEBSOCKET_MSG.info("会话消息不存在,服务器主动断开,等待重新链接..");
             return;
         }
 

+ 20 - 0
soc-modules/soc-modules-host/src/main/resources/logback.xml

@@ -303,6 +303,22 @@
             <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
         </encoder>
     </appender>
+    <!-- 定时检查WS链接状态 -->
+    <appender name="socketConnectStatusCheck" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <maxFileSize>${max.file.size}</maxFileSize>
+            <!--日志文件输出的文件名-->
+            <FileNamePattern>${LOG_HOME}/socketConnectStatusCheck/%d{yyyy-MM-dd}-%i.log</FileNamePattern>
+            <!--日志文件保留天数-->
+            <maxHistory>${max.history}</maxHistory>
+            <totalSizeCap>${total.size.cap}</totalSizeCap>
+            <cleanHistoryOnStart>true</cleanHistoryOnStart>
+        </rollingPolicy>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
 
     <logger name="websocketMsgLog" additivity="false" level="info">
         <!--上面appender元素的name值。为了和logger的name属性做区分,我故意写的不一样-->
@@ -373,6 +389,10 @@
         <!--上面appender元素的name值。为了和logger的name属性做区分,我故意写的不一样-->
         <appender-ref ref="socketDvsTemplateLog"/>
     </logger>
+    <logger name="socketConnectStatusCheck" additivity="false" level="info">
+        <!--上面appender元素的name值。为了和logger的name属性做区分,我故意写的不一样-->
+        <appender-ref ref="socketConnectStatusCheck"/>
+    </logger>
 
     <!-- 日志输出级别 -->
     <root level="INFO">

+ 1 - 1
soc-modules/soc-modules-host/src/main/resources/mapper/IotDeviceInfoMapper.xml

@@ -116,6 +116,6 @@
         select id,name
         from sys_multi_layer_dictionary
         where name = #{name}
-          and type = #{name}
+          and type = #{type}
     </select>
 </mapper>

+ 4 - 0
soc-modules/soc-modules-iot/src/main/java/com/xunmei/iot/vo/deviceInfo/IotDeviceInfoPageVo.java

@@ -58,4 +58,8 @@ public class IotDeviceInfoPageVo implements Serializable {
 
     @ApiModelProperty(value = "ip地址")
     private String ip;
+    @ApiModelProperty(value = "品牌")
+    private String deviceBrand;
+
+    private Integer enableStatus;
 }

+ 4 - 1
soc-modules/soc-modules-iot/src/main/resources/mapper/IotDeviceInfoMapper.xml

@@ -8,11 +8,14 @@
             o.affiliated_bank AS secondOrgName,
             o.short_name AS orgName,
             ie.net_address as ip,
-            ifnull(d.net_status,0) as netStatus
+            ifnull(d.net_status,0) as netStatus,
+            ld.name as deviceBrand,
+            d.enable as enableStatus
         FROM
             iot_device_info d
             inner JOIN sys_org o ON o.id = d.org_id
             left JOIN iot_device_info_extend ie  ON ie.iot_token=d.iot_token and ie.device_id=d.id and ie.deleted=0
+            left join sys_multi_layer_dictionary ld on ld.id = d.device_brand
         where d.deleted = 0
             <if test="req.deviceName !=null and req.deviceName !=''">
                 and d.device_name like concat('%',#{req.deviceName},'%')