Przeglądaj źródła

设备状态通知消息处理代码提交

jingyuanchao 1 rok temu
rodzic
commit
11c7e841e3

+ 1 - 0
soc-api/soc-api-system/src/main/java/com/xunmei/system/api/domain/iot/IotDeviceInfo.java

@@ -30,6 +30,7 @@ public class IotDeviceInfo extends BaseEntity {
     @ApiModelProperty("机构名称")
     private String orgName;
     private String orgPath;
+    //枚举 com.xunmei.mediator.websocket.enums.DeviceNetStatusEnum
     @ApiModelProperty("网络状态")
     private String netStatus;
     private String hostCode;

+ 1 - 1
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/config/SqlLogInterceptor.java

@@ -122,7 +122,7 @@ public class SqlLogInterceptor implements Interceptor {
 		MetaObject metaObject = SystemMetaObject.forObject(target);
 		MappedStatement ms = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
 		// 打印 sql
-		log.info(
+		log.debug(
             "\n==============  Sql Start  ==============" +
                 "\nExecute ID  :{}" +
                 "\nExecute SQL :{}" +

+ 2 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/iot/service/IIotDeviceInfoService.java

@@ -72,4 +72,6 @@ public interface IIotDeviceInfoService extends IService<IotDeviceInfo> {
 
     void deleteAlarmHostAndDvsByToken(String token);
     List<IotDeviceInfo> selectAllDeviceByDeviceType(List<DeviceTypeEnum> deviceType);
+
+    IotDeviceInfo selectByTokenProductAndDeviceCode(String token, String productName, String deviceType);
 }

+ 4 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/iot/service/IotDeviceStatusService.java

@@ -18,4 +18,8 @@ public interface IotDeviceStatusService extends IService<IotDeviceStatus> {
     WebsocketResult dealDeviceStatusChange(WebsocketExecuteReq executeReq);
 
     IotDeviceStatus getByDeviceId(Long deviceId);
+
+
+    void deviceStatusChangeDeal(WebsocketExecuteReq req);
+
 }

+ 12 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.mediator.iot.mapper.IotDeviceInfoMapper;
 import com.xunmei.mediator.iot.service.IIotDeviceInfoService;
@@ -205,4 +206,15 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         wrapper.eq(IotDeviceInfo::getDeleted, 0);
         return baseMapper.selectList(wrapper);
     }
+
+    @Override
+    public IotDeviceInfo selectByTokenProductAndDeviceCode(String token, String productName, String deviceCode) {
+        LambdaQueryWrapper<IotDeviceInfo> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(IotDeviceInfo::getIotToken, token);
+        wrapper.eq(IotDeviceInfo::getDeviceProduct, productName);
+        wrapper.eq(IotDeviceInfo::getDeviceCode, deviceCode);
+        wrapper.eq(IotDeviceInfo::getDeleted, 0);
+        wrapper.last(Constants.LIMIT1);
+        return baseMapper.selectOne(wrapper);
+    }
 }

+ 36 - 5
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/iot/service/impl/IotDeviceStatusServiceImpl.java

@@ -9,6 +9,7 @@ import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmSystemField;
+import com.xunmei.common.core.enums.iot.DeviceTypeEnum;
 import com.xunmei.common.core.enums.iot.SensorType;
 import com.xunmei.mediator.api.alarm.mapper.IotAlarmSystemFieldMapper;
 import com.xunmei.mediator.api.alarm.service.IotAlarmDataService;
@@ -17,10 +18,11 @@ import com.xunmei.mediator.iot.service.IIotDeviceInfoService;
 import com.xunmei.mediator.iot.service.IotDeviceStatusService;
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
-import com.xunmei.common.core.enums.iot.DeviceTypeEnum;
+import com.xunmei.mediator.websocket.enums.DeviceNetStatusEnum;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
 import com.xunmei.system.api.domain.iot.IotDeviceStatus;
 import com.xunmei.system.api.enums.ElectricityMeterAttributes;
+import com.xunmei.system.api.util.LogUtils;
 import io.netty.util.internal.StringUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
@@ -30,6 +32,7 @@ import org.springframework.transaction.annotation.Transactional;
 import org.springframework.transaction.support.TransactionSynchronization;
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
@@ -120,7 +123,7 @@ public class IotDeviceStatusServiceImpl extends ServiceImpl<IotDeviceStatusMappe
         }
         String deviceType = SensorType.getCodeByProduct(productName);
         List<IotAlarmSystemField> fieldList = alarmSystemFieldMapper.selectList(new LambdaQueryWrapper<>());
-        final JSONArray array = dealStatusData(data,fieldList,deviceType);
+        final JSONArray array = dealStatusData(data, fieldList, deviceType);
         IotDeviceStatus status = getByDeviceId(code.getId());
         if (status == null) {
             status = new IotDeviceStatus();
@@ -131,7 +134,7 @@ public class IotDeviceStatusServiceImpl extends ServiceImpl<IotDeviceStatusMappe
             status.setDeviceType(deviceType);
             status.setInfo(JSON.toJSONString(array));
             save(status);
-        }else {
+        } else {
             status.setInfo(JSON.toJSONString(array));
             updateById(status);
         }
@@ -149,7 +152,8 @@ public class IotDeviceStatusServiceImpl extends ServiceImpl<IotDeviceStatusMappe
 
         return new WebsocketResult();
     }
-    private static JSONArray dealStatusData(JSONArray data,List<IotAlarmSystemField> fieldList,String deviceType) {
+
+    private static JSONArray dealStatusData(JSONArray data, List<IotAlarmSystemField> fieldList, String deviceType) {
         final JSONArray array = new JSONArray();
         for (Object datum : data) {
             JSONObject jsb = (JSONObject) datum;
@@ -167,7 +171,7 @@ public class IotDeviceStatusServiceImpl extends ServiceImpl<IotDeviceStatusMappe
             object.put("val", val);
             final ElectricityMeterAttributes attributesEnum = ElectricityMeterAttributes.getEnumByName(name);
             if (ObjectUtil.isNull(attributesEnum)) {
-              continue;
+                continue;
             }
             if (ObjectUtil.isNotEmpty(attributesEnum.getUnit())) {
                 //说明存在单位,那么把val拼接上单位set到原来的属性值中去
@@ -186,4 +190,31 @@ public class IotDeviceStatusServiceImpl extends ServiceImpl<IotDeviceStatusMappe
         }
         return array;
     }
+
+    @Override
+    public void deviceStatusChangeDeal(WebsocketExecuteReq req) {
+        final String productName = req.getProductName();
+        final String deviceName = req.getDeviceName();
+        final String token = req.getToken();
+        if (ObjectUtil.hasEmpty(productName, deviceName, token)) {
+            LogUtils.WEBSOCKET_MSG.error("收到设备状态变更消息,存在参数为空,productName:{},deviceName:{},token:{}", productName, deviceName, token);
+        }
+        IotDeviceInfo deviceInfo = iotDeviceInfoService.selectByTokenProductAndDeviceCode(token, productName, deviceName);
+        if (ObjectUtil.isNull(deviceInfo)) {
+            LogUtils.WEBSOCKET_MSG.error("收到设备状态变更消息,根据token:{},productName:{},deviceName:{}未查询到设备信息", token, productName, deviceName);
+            return;
+        }
+
+        final JSONObject data = (JSONObject) req.getData();
+        final Object status = data.get("status");
+        if (ObjectUtil.isEmpty(status)) {
+            LogUtils.WEBSOCKET_MSG.error("收到设备状态变更消息,消息内容中设备状态为空,token:{},productName:{},deviceName:{}", token, productName, deviceName);
+            return;
+        }
+        deviceInfo.setNetStatus(DeviceNetStatusEnum.getValue(status.toString()));
+        deviceInfo.setUpdateTime(new Date());
+        iotDeviceInfoService.updateById(deviceInfo);
+
+
+    }
 }

+ 12 - 2
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/dto/WebsocketPayloadResolve.java

@@ -3,6 +3,7 @@ package com.xunmei.mediator.websocket.dto;
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
+import com.xunmei.mediator.websocket.enums.TopicTypeEnums;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -31,10 +32,19 @@ public class WebsocketPayloadResolve {
                 ((JSONObject) ((JSONObject) parentPayload.get(WebSocketConstants.ARGS)).toJavaObject(WebsocketResult.class).getPayload()) :
                 parentPayload;
 
+        //设备状态通知的消息结构特殊处理
+        if (TopicTypeEnums.matcherTopicTypeEnums(result.getTopic()) == TopicTypeEnums.DEVICE_STATUS) {
+            this.data = payloadToExtract;
+            this.topic = result.getTopic();
+            this.header = result.getHeaders();
+            return;
+        }
+
+
         this.routingKey = getRoutingKey(payloadToExtract);
         this.header = payloadToExtract.getJSONObject(WebSocketConstants.HEADER);
-        this.data = payloadToExtract.get(WebSocketConstants.ARGS) != null ?payloadToExtract.get(WebSocketConstants.ARGS) :payloadToExtract.get(WebSocketConstants.DATA);
-        this.topic = isPassThrough ?((JSONObject) parentPayload.get(WebSocketConstants.ARGS)).toJavaObject(WebsocketResult.class).getTopic() :result.getTopic();
+        this.data = payloadToExtract.get(WebSocketConstants.ARGS) != null ? payloadToExtract.get(WebSocketConstants.ARGS) : payloadToExtract.get(WebSocketConstants.DATA);
+        this.topic = isPassThrough ? ((JSONObject) parentPayload.get(WebSocketConstants.ARGS)).toJavaObject(WebsocketResult.class).getTopic() : result.getTopic();
     }
 
     private String getRoutingKey(JSONObject payload) {

+ 40 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/enums/DeviceNetStatusEnum.java

@@ -0,0 +1,40 @@
+package com.xunmei.mediator.websocket.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * @author jingyuanchao
+ * @date 2024/7/31 14:39
+ */
+@Getter
+@AllArgsConstructor
+public enum DeviceNetStatusEnum {
+
+    UNKNOWN("unknown","0"),
+
+    ONLINE("Online","1"),
+
+    OFFLINE("Offline","2"),
+    ;
+
+    private final String name;
+
+
+    private final String value;
+
+
+    public static DeviceNetStatusEnum getEnum(String name) {
+        for (DeviceNetStatusEnum deviceNetStatusEnum : DeviceNetStatusEnum.values()) {
+            if (deviceNetStatusEnum.getName().equals(name)) {
+                return deviceNetStatusEnum;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    public static String getValue(String name) {
+        return getEnum(name).getValue();
+    }
+
+}

+ 0 - 1
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/enums/TopicTypeEnums.java

@@ -10,7 +10,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 @Getter
-
 @AllArgsConstructor
 public enum TopicTypeEnums {
 

+ 3 - 3
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -145,12 +145,12 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         try {
             WebsocketResult websocketResult = JSON.parseObject(payload, WebsocketResult.class);
             Object obj = websocketResult.getPayload();
-            WebsocketPayloadResolve payloadResolve = new WebsocketPayloadResolve(websocketResult);
-            String topic = payloadResolve.getTopic();
             if (ObjectUtil.isEmpty(obj)) {
                 LogUtils.WEBSOCKET_MSG.error("消息内容为空,ip:{}", ip);
                 return;
             }
+            WebsocketPayloadResolve payloadResolve = new WebsocketPayloadResolve(websocketResult);
+            String topic = payloadResolve.getTopic();
             TopicTypeEnums typeEnums = TopicTypeEnums.matcherTopicTypeEnums(topic);
             if (typeEnums == null) {
                 LogUtils.WEBSOCKET_MSG.error("消息topic错误,ip:{},topic:{}", ip, topic);
@@ -166,6 +166,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
                     break;
                 //设备状态通知
                 case DEVICE_STATUS:
+                    iotDeviceStatusService.deviceStatusChangeDeal(executeReq);
                     break;
                 //产品事件通知消息
                 case PRODUCT_EVENT_NOTICE:
@@ -173,7 +174,6 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
                 case PRODUCT_SERVICE_REPLY:
                     RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(), payloadResolve.getRoutingKey());
                     if (routeService == null) {
-                        LogUtils.WEBSOCKET_MSG.error("消息路由服务不存在,ip:{},topic:{}", ip, topic);
                         return;
                     }
                     result = (WebsocketResult) routeService.execute(executeReq);

+ 3 - 4
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/RouterServiceHandler.java

@@ -37,18 +37,17 @@ public class RouterServiceHandler {
     public static RouterService getRouteService(String productName, String routerKey) {
         ProductEnums enums = ProductEnums.getByProductName(productName);
         if (enums == null) {
-            throw new RuntimeException("未找到对应的产品类型");
+            LogUtils.WEBSOCKET_MSG.error("未找到对应的产品类型:{}", routerKey);
+            return null;
         }
-
         final Optional<RouterService> first = PRODUCT_HASH_MAP.get(enums)
                 .stream()
                 .filter(routerService -> routerService.routerKey().contains(routerKey))
                 .findFirst();
         if (first.isPresent()) {
             return first.get();
-        } else {
-            LogUtils.WEBSOCKET_MSG.error("未找到对应的处理类");
         }
+        LogUtils.WEBSOCKET_MSG.error("不支持的事件或服务:{}", routerKey);
         return null;
     }
 }

+ 42 - 15
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/impl/WebsocketServiceImpl.java

@@ -952,21 +952,18 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
     @Override
     public Object execute(WebsocketExecuteReq req) {
         try {
-            if (WebSocketConstants.GET_DEVICE_BASE_INFOS.equals(req.getEvent())) {
-                JSONObject object = (JSONObject) req.getData();
-                if (object != null) {
-                    List<JSONObject> deviceBaseInfos = (List<JSONObject>) object.get("deviceBaseInfos");
-                    final List<DeviceBaseInfo> list = deviceBaseInfos.stream().map(r -> JSON.toJavaObject(r, DeviceBaseInfo.class)).collect(Collectors.toList());
-                    dealBaseDeviceInfo(list, req.getToken());
-                }
-            } else if (WebSocketConstants.GET_ALARM_HOST_DEVICE_INFOS.equals(req.getEvent())) {
-                JSONObject object = (JSONObject) req.getData();
-                final AlarmHostBaseInfo alarmHostBaseInfo = JSON.toJavaObject(object, AlarmHostBaseInfo.class);
-                dealAlarmHostBaseInfo(alarmHostBaseInfo, req.getToken());
-            } else if (WebSocketConstants.GET_DVS_DEVICE_INFOS.equals(req.getEvent())) {
-                JSONObject object = (JSONObject) req.getData();
-                final DvsBaseInfo dvsBaseInfo = JSON.toJavaObject(object, DvsBaseInfo.class);
-                dealDvsBaseInfo(dvsBaseInfo, req.getToken());
+            switch (req.getEvent()) {
+                case WebSocketConstants.GET_DEVICE_BASE_INFOS:
+                    handleDeviceBaseInfos(req);
+                    break;
+                case WebSocketConstants.GET_ALARM_HOST_DEVICE_INFOS:
+                    handleAlarmHostBaseInfo(req);
+                    break;
+                case WebSocketConstants.GET_DVS_DEVICE_INFOS:
+                    handleDvsBaseInfo(req);
+                    break;
+                default:
+                    throw new IllegalArgumentException("不支持的事件或服务: " + req.getEvent());
             }
         } catch (Exception e) {
             LogUtils.WEBSOCKET_MSG.error("设备基础数据处理错误:{}", e);
@@ -974,4 +971,34 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         }
         return WebsocketResult.of(ReceiveErrorDto.success(), req.getTopic(), req.getId());
     }
+
+    private void handleDeviceBaseInfos(WebsocketExecuteReq req) throws Exception {
+        JSONObject object = (JSONObject) req.getData();
+        if (object != null) {
+            List<JSONObject> deviceBaseInfos = (List<JSONObject>) object.get("deviceBaseInfos");
+            if (deviceBaseInfos != null) {
+                List<DeviceBaseInfo> list = deviceBaseInfos.stream()
+                        .map(r -> JSON.toJavaObject(r, DeviceBaseInfo.class))
+                        .collect(Collectors.toList());
+                dealBaseDeviceInfo(list, req.getToken());
+            }
+        }
+    }
+
+    private void handleAlarmHostBaseInfo(WebsocketExecuteReq req) throws Exception {
+        JSONObject object = (JSONObject) req.getData();
+        if (object != null) {
+            AlarmHostBaseInfo alarmHostBaseInfo = JSON.toJavaObject(object, AlarmHostBaseInfo.class);
+            dealAlarmHostBaseInfo(alarmHostBaseInfo, req.getToken());
+        }
+    }
+
+    private void handleDvsBaseInfo(WebsocketExecuteReq req) throws Exception {
+        JSONObject object = (JSONObject) req.getData();
+        if (object != null) {
+            DvsBaseInfo dvsBaseInfo = JSON.toJavaObject(object, DvsBaseInfo.class);
+            dealDvsBaseInfo(dvsBaseInfo, req.getToken());
+        }
+    }
+
 }