Browse Source

设备同步 处理结果增加事件

jingyuanchao 11 months ago
parent
commit
65fca66e9f

+ 89 - 56
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -13,17 +13,18 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.google.common.collect.Lists;
 import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.constant.ErrorMsgConstants;
+import com.xunmei.common.core.constant.HttpStatus;
 import com.xunmei.common.core.domain.device.domain.SysMultiLayerDictionary;
 import com.xunmei.common.core.domain.iot.domain.IotAlarmDefenceArea;
 import com.xunmei.common.core.domain.iot.domain.IotAlarmSubsystem;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
-import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
 import com.xunmei.common.core.domain.work.dto.ControlDeviceDto;
 import com.xunmei.common.core.domain.work.dto.WorkDayDto;
 import com.xunmei.common.core.enums.iot.BaseDeviceTypeEnum;
 import com.xunmei.common.core.enums.iot.DefenceAreaType;
 import com.xunmei.common.core.enums.work.RedirectTypeEnum;
 import com.xunmei.common.core.utils.JacksonUtils;
+import com.xunmei.common.redis.enums.RedisDelayQueueEnum;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.host.iot.mapper.IotDeviceInfoMapper;
 import com.xunmei.host.iot.service.IIotDeviceInfoExtendService;
@@ -39,6 +40,7 @@ import com.xunmei.host.websocket.constant.WebSocketConstants;
 import com.xunmei.host.websocket.dto.DeviceStatusInfo;
 import com.xunmei.host.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.host.websocket.dto.WebsocketResult;
+import com.xunmei.host.websocket.dto.dvs.DevicePushResult;
 import com.xunmei.host.websocket.dto.dvs.DvsBaseInfo;
 import com.xunmei.host.websocket.dto.dvs.SubDeviceInfo;
 import com.xunmei.host.websocket.enums.DeviceCacheEnum;
@@ -46,6 +48,7 @@ import com.xunmei.host.websocket.enums.DeviceNetStatusEnum;
 import com.xunmei.host.websocket.enums.ProductEnums;
 import com.xunmei.host.websocket.enums.TopicTypeEnums;
 import com.xunmei.host.websocket.redis.delay.RedisDelayQueueHandle;
+import com.xunmei.host.websocket.redis.delay.RedisDelayedQueueUtil;
 import com.xunmei.host.websocket.service.RouterService;
 import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
@@ -360,9 +363,8 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
     public WebsocketResult execute(WebsocketExecuteReq req) {
         try {
             LogUtils.SOCKET_BASE_DEVICE_INFO.info("收到消息:{}", JacksonUtils.toJSONString(req));
-            Object data = req.getData();
-            JSONArray dataArray = (JSONArray) data;
-            if (ObjectUtil.isEmpty(dataArray)) {
+            final Object data = req.getData();
+            if (ObjectUtil.isEmpty(data)) {
                 return WebsocketResult.replySuccess(req);
             }
             IotServerInfo serverInfo = req.getServerInfo();
@@ -370,6 +372,7 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
             switch (event) {
                 case WebSocketConstants.DEVICES_EVENT:
                 case WebSocketConstants.GET_DEVICES_SERVICES:
+                    JSONArray dataArray = (JSONArray) data;
                     List<DvsBaseInfo> dataList = dataArray.toJavaList(DvsBaseInfo.class);
 
                     List<IotDeviceInfo> syncToAssetDeviceList = new ArrayList<>();
@@ -380,15 +383,18 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
 
                     return WebsocketResult.replySuccess(req);
                 case WebSocketConstants.DEVICES_STATUS_EVENT:
-                    List<DeviceStatusInfo> statusInfos = dataArray.toJavaList(DeviceStatusInfo.class);
+                    JSONArray statusArray = (JSONArray) data;
+                    List<DeviceStatusInfo> statusInfos = statusArray.toJavaList(DeviceStatusInfo.class);
                     deviceStatusChange(statusInfos, serverInfo);
                     break;
+                //平台下发设备后,主机上报该设备的全量数据(单设备)
                 case WebSocketConstants.INCREMENT_DEVICES_EVENT:
-                    List<DvsBaseInfo> list = dataArray.toJavaList(DvsBaseInfo.class);
-
-                    List<IotDeviceInfo> syncDeviceList = new ArrayList<>();
-                    handleIncrementDevices(list, serverInfo, syncDeviceList);
+                    final DvsBaseInfo dvsBaseInfo = JSON.parseObject(data.toString(), DvsBaseInfo.class);
+                    handleIncrementDevices(dvsBaseInfo, serverInfo);
                     break;
+                //平台下发设备后,主机上报该设备保存结果
+                case WebSocketConstants.INCREMENT_DEVICES_RESULT_EVENT:
+                    dealPushDeviceResult(data);
                 default:
                     return WebsocketResult.replyError(req, ErrorMsgConstants.ERROR_ROUTE);
             }
@@ -399,53 +405,75 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         return WebsocketResult.replySuccess(req);
     }
 
-    private void handleIncrementDevices(List<DvsBaseInfo> list, IotServerInfo serverInfo, List<IotDeviceInfo> syncToAssetDeviceList) {
+
+    private void dealPushDeviceResult(Object obj) {
+        final DevicePushResult result = JSON.parseObject(obj.toString(), DevicePushResult.class);
+        LogUtils.WEBSOCKET_MSG.info("主机上报处理平台下发设备{}结果:{}", result.getDeviceName(), result);
+        if (BaseDeviceTypeEnum.Dvs.name().equals(result.getDeviceType()) && WebSocketConstants.INSERT.equals(result.getPushType())) {
+            //新增DVR的上报结果不在这里处理, 需要等待摄像头上报后修改设备同步状态
+            return;
+        }
+        if (ObjectUtil.equal(result.getStatusCode(), HttpStatus.SUCCESS)) {
+            updateDeviceSyncStatus(result.getDeviceId(), 2);
+        } else {
+            updateDeviceSyncStatus(result.getDeviceId(), 3);
+        }
+    }
+
+    private void updateDeviceSyncStatus(Long id, int status) {
+        if (RedisDelayedQueueUtil.removeDelayedQueue(id, RedisDelayQueueEnum.DEVICE_SYNC_STATUS_MONITOR.getCode())) {
+            final LambdaUpdateWrapper<IotDeviceInfo> wrapper = new LambdaUpdateWrapper<>();
+            wrapper.eq(IotDeviceInfo::getId, id).set(IotDeviceInfo::getSyncStatus, status);
+            baseMapper.update(null, wrapper);
+        }
+    }
+
+    private void handleIncrementDevices(DvsBaseInfo baseInfo, IotServerInfo serverInfo) {
         //添加通道类设备集合
         List<IotDeviceInfo> addChnnelList = new ArrayList<>();
         //更新通道类设备集合
         List<IotDeviceInfo> updateChnnelList = new ArrayList<>();
-        for (DvsBaseInfo baseInfo : list) {
-            baseMapper.updateDvsAndChannelDeleted(serverInfo.getIotCode(), baseInfo.getDeviceCode());
-            String code = BaseDeviceTypeEnum.valueOf(baseInfo.getType()).getCode();
-            IotDeviceInfo hostInfo = selectByTypeAndCode(serverInfo.getIotCode(), code, baseInfo.getProductName(), baseInfo.getDeviceCode());
-            if (ObjectUtil.isNull(hostInfo)) {
-                hostInfo = createHostInfo(baseInfo, serverInfo, code);
-                save(hostInfo);
-            } else {
-                updateHostInfo(baseInfo, hostInfo, code);
-                updateById(hostInfo);
-            }
-            updateCache(hostInfo, 1);
-            //删除主机扩展信息
-            iIotDeviceInfoExtendService.deletedExtendByDeviceId(hostInfo.getId());
+        baseMapper.updateDvsAndChannelDeleted(serverInfo.getIotCode(), baseInfo.getDeviceCode());
+        String code = BaseDeviceTypeEnum.valueOf(baseInfo.getType()).getCode();
+        IotDeviceInfo hostInfo = selectByTypeAndCode(serverInfo.getIotCode(), code, baseInfo.getProductName(), baseInfo.getDeviceCode());
+        if (ObjectUtil.isNull(hostInfo)) {
+            hostInfo = createHostInfo(baseInfo, serverInfo, code);
+            save(hostInfo);
+        } else {
+            updateHostInfo(baseInfo, hostInfo, code);
+            updateById(hostInfo);
+        }
+        updateCache(hostInfo, 1);
+        //删除主机扩展信息
+        iIotDeviceInfoExtendService.deletedExtendByDeviceId(hostInfo.getId());
+
+        IotDeviceInfoExtend extend = iIotDeviceInfoExtendService.selectByProductDeviceNameAndToken(serverInfo.getIotCode(), code, baseInfo.getDeviceCode());
+        if (extend == null) {
+            final IotDeviceInfoExtend infoExtend = createIotDeviceInfoExtend(baseInfo, hostInfo.getId(), serverInfo.getIotCode());
+            iIotDeviceInfoExtendService.save(infoExtend);
+        } else {
+            dealDvsExtend(baseInfo, extend);
+            iIotDeviceInfoExtendService.updateById(extend);
+        }
+        sysDeviceService.addDevice(Lists.newArrayList(hostInfo));
+        List<SubDeviceInfo> subDeviceList = baseInfo.getSubDevices();
+        if (ObjectUtil.isEmpty(subDeviceList)) {
+            return;
+        }
 
-            IotDeviceInfoExtend extend = iIotDeviceInfoExtendService.selectByProductDeviceNameAndToken(serverInfo.getIotCode(), code, baseInfo.getDeviceCode());
-            if (extend == null) {
-                final IotDeviceInfoExtend infoExtend = createIotDeviceInfoExtend(baseInfo, hostInfo.getId(), serverInfo.getIotCode());
-                iIotDeviceInfoExtendService.save(infoExtend);
+        //处理通道信息
+        for (SubDeviceInfo subDeviceInfo : subDeviceList) {
+            IotDeviceInfo channelInfo = selectByTypeAndHostAndCode(serverInfo.getIotCode(), subDeviceInfo.getParentCode(), baseInfo.getProductName(), subDeviceInfo.getDeviceCode());
+            if (ObjectUtil.isNull(channelInfo)) {
+                channelInfo = createChannelInfo(subDeviceInfo, serverInfo, hostInfo);
+                addChnnelList.add(channelInfo);
             } else {
-                dealDvsExtend(baseInfo, extend);
-                iIotDeviceInfoExtendService.updateById(extend);
-            }
-            syncToAssetDeviceList.add(hostInfo);
-            List<SubDeviceInfo> subDeviceList = baseInfo.getSubDevices();
-            if (ObjectUtil.isEmpty(subDeviceList)) {
-                continue;
-            }
-
-            //处理通道信息
-            for (SubDeviceInfo subDeviceInfo : subDeviceList) {
-                IotDeviceInfo channelInfo = selectByTypeAndHostAndCode(serverInfo.getIotCode(), subDeviceInfo.getParentCode(), baseInfo.getProductName(), subDeviceInfo.getDeviceCode());
-                if (ObjectUtil.isNull(channelInfo)) {
-                    channelInfo = createChannelInfo(subDeviceInfo, serverInfo, hostInfo);
-                    addChnnelList.add(channelInfo);
-                } else {
-                    updateChannelInfo(subDeviceInfo, channelInfo, hostInfo);
-                    updateChnnelList.add(channelInfo);
-                }
-                updateCache(channelInfo, 2);
+                updateChannelInfo(subDeviceInfo, channelInfo, hostInfo);
+                updateChnnelList.add(channelInfo);
             }
+            updateCache(channelInfo, 2);
         }
+        updateDeviceSyncStatus(hostInfo.getId(), 2);
         batchDealIotDeviceInfo(Lists.newLinkedList(), addChnnelList, true);
         batchDealIotDeviceInfo(Lists.newLinkedList(), updateChnnelList, false);
     }
@@ -961,6 +989,7 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
             final LambdaUpdateWrapper<IotDeviceInfo> wrapper = new LambdaUpdateWrapper<>();
             wrapper.eq(IotDeviceInfo::getId, deviceId);
             wrapper.set(IotDeviceInfo::getSyncStatus, 3);
+            baseMapper.update(null, wrapper);
         }
     }
 
@@ -991,14 +1020,18 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         }
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new JSONObject(), WebSocketConstants.PUSH_DEVICES_SERVICES, param);
-        final Set<String> set = websocketService.getAllIotCode();
-        if (set.isEmpty()) {
-            return;
-        }
-        for (String token : set) {
-            final IotWebsocketMsg iotWebsocketMsg = websocketMsgService.proactiveSendAndSaveMsg(token, result);
-            websocketService.sendMsgByTokens(iotWebsocketMsg, token);
-        }
+        websocketService.sendMsgByTokens(result, deviceInfo.getIotToken());
+        monitorSyncStatus(deviceInfo);
+    }
+
+    private void monitorSyncStatus(IotDeviceInfo deviceInfo) {
+        final LocalDateTime time = LocalDateTime.now().plusMinutes(5);
+  /*      final JSONObject object = new JSONObject();
+        object.put("deviceId", deviceInfo.getId());
+        object.put("iotCode", deviceInfo.getId());
+        object.put("deviceProduct", deviceInfo.getId());
+        object.put("deviceCode", deviceInfo.getId());*/
+        RedisDelayedQueueUtil.addDelayQueue(deviceInfo.getId(), time, RedisDelayQueueEnum.DEVICE_SYNC_STATUS_MONITOR.getCode());
     }
 
     private JSONObject getDvsJson(IotDeviceInfo deviceInfo) {

+ 6 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/constant/WebSocketConstants.java

@@ -156,6 +156,10 @@ public interface WebSocketConstants {
      */
     String INCREMENT_DEVICES_EVENT = "incrementDevices";
     /**
+     * /平台下发设备后,主机上报该设备保存结果
+     */
+    String INCREMENT_DEVICES_RESULT_EVENT = "incrementDevicesResult";
+    /**
      * 主机主动上报 设备网络状态 事件名称
      */
     String DEVICES_STATUS_EVENT = "deviceStatus";
@@ -240,4 +244,6 @@ public interface WebSocketConstants {
     String REPLY_ID="replyId";
     String DETECTION_HOST="DetectionHost";
     String DETECTION_HOST_DEVICE="DetectionHostDevice";
+    String INSERT = "insert";
+    String EDIT = "edit";
 }

+ 38 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/dto/dvs/DevicePushResult.java

@@ -0,0 +1,38 @@
+package com.xunmei.host.websocket.dto.dvs;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author jingyuanchao
+ * @date 2024/11/21 17:36
+ */
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class DevicePushResult {
+
+
+    @ApiModelProperty("设备id")
+    private Long deviceId;
+
+    @ApiModelProperty("设备名称")
+    private String deviceName;
+
+    @ApiModelProperty("设备类型")
+    private String deviceType;
+
+    @ApiModelProperty("推送类型")
+    private String pushType;
+
+    @ApiModelProperty("状态码")
+    private Integer statusCode;
+
+    @ApiModelProperty("状态描述")
+    private String statusDescription;
+
+
+}

+ 5 - 0
soc-modules/soc-modules-iot/src/main/java/com/xunmei/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -5,6 +5,7 @@ import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -553,6 +554,10 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
     @Override
     @Transactional(rollbackFor = Exception.class)
     public Integer syncDevice(Long id) {
+        final LambdaUpdateWrapper<IotDeviceInfo> wrapper = new LambdaUpdateWrapper<>();
+        wrapper.eq(IotDeviceInfo::getId, id);
+        wrapper.set(IotDeviceInfo::getSyncStatus, 1);
+        baseMapper.update(null,wrapper);
         remoteHostService.syncDeviceToHost(id);
         return 1;
     }