فهرست منبع

ws消息发送代码优化提交,无用代码删除

jingyuanchao 1 سال پیش
والد
کامیت
6552e2245d
16فایلهای تغییر یافته به همراه208 افزوده شده و 292 حذف شده
  1. 80 0
      soc-common/soc-common-core/src/main/java/com/xunmei/common/core/domain/iot/domain/IotWebsocketMsg.java
  2. 6 5
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/alarm/service/impl/IotAlarmRuleServiceImpl.java
  3. 5 4
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java
  4. 2 34
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/controller/HostController.java
  5. 21 0
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/controller/IotWebsocketMsgController.java
  6. 16 0
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/mapper/IotWebsocketMsgMapper.java
  7. 16 0
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/IotWebsocketMsgService.java
  8. 20 0
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java
  9. 7 78
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/controller/WebsocketController.java
  10. 0 48
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/dto/WebsocketResult.java
  11. 1 21
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketPublisher.java
  12. 21 22
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java
  13. 2 2
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/WebsocketService.java
  14. 6 34
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java
  15. 0 44
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/utils/IotServerUtils.java
  16. 5 0
      soc-modules/soc-modules-host/src/main/resources/mapper/IotWebsocketMsgMapper.xml

+ 80 - 0
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/domain/iot/domain/IotWebsocketMsg.java

@@ -0,0 +1,80 @@
+package com.xunmei.common.core.domain.iot.domain;
+
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author jingYuanChao
+ * @since 2024-10-28
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@TableName("iot_websocket_msg")
+@ApiModel(value="IotWebsocketMsg对象", description="")
+public class IotWebsocketMsg implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @ApiModelProperty(value = "消息id")
+    @TableId("id")
+    private String id;
+
+    @ApiModelProperty(value = "事件类型")
+    @TableField("event")
+    private String event;
+
+    @ApiModelProperty(value = "目标主机")
+    @TableField("iot_code")
+    private String iotCode;
+
+    @ApiModelProperty(value = "主机名称")
+    @TableField("server_name")
+    private String serverName;
+
+    @ApiModelProperty(value = "是否处理,0:未处理,1:已处理完成")
+    @TableField("deal")
+    private Integer deal;
+
+    @ApiModelProperty(value = "消息内容")
+    @TableField("content")
+    private String content;
+
+    @ApiModelProperty(value = "准备发送时间")
+    @TableField("ready_send_time")
+    private LocalDateTime readySendTime;
+
+    @ApiModelProperty(value = "实际发送时间")
+    @TableField("really_send_time")
+    private LocalDateTime reallySendTime;
+
+    @ApiModelProperty(value = "触发时间,即第一次重试时间")
+    @TableField("fire_time")
+    private LocalDateTime fireTime;
+
+    @TableField(value = "max_retry_times")
+    @ApiModelProperty(value = "最大重试次数")
+    private Integer maxRetryTimes;
+
+    @TableField(value = "cur_retry_times")
+    @ApiModelProperty(value = "当前重试次数")
+    private Integer curRetryTimes;
+
+    @TableField(value = "retry_interval")
+    @ApiModelProperty(value = "重试间隔,分钟")
+    private Integer retryInterval;
+
+
+
+}

+ 6 - 5
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/alarm/service/impl/IotAlarmRuleServiceImpl.java

@@ -4,6 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.google.common.collect.Sets;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmRule;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmRuleExpress;
@@ -18,8 +19,8 @@ import com.xunmei.host.websocket.dto.WebsocketResult;
 import com.xunmei.host.websocket.enums.ProductEnums;
 import com.xunmei.host.websocket.enums.TopicTypeEnums;
 import com.xunmei.host.websocket.service.RouterService;
+import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.host.websocket.utils.IotServerUtils;
-import com.xunmei.host.websocket.utils.WebSocketUtils;
 import org.springframework.beans.BeanUtils;
 import org.springframework.stereotype.Service;
 
@@ -34,6 +35,8 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
     private IotAlarmRuleSourceService ruleSourceService;
     @Resource
     private IotAlarmRuleExpressService ruleExpressService;
+    @Resource
+    private WebsocketService websocketService;
 
     @Override
     public ProductEnums product() {
@@ -70,9 +73,7 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
 
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), ProductEnums.DETECTION_HOST.getProductName().get(0), ProductEnums.DETECTION_HOST.getProductName().get(1));
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new com.alibaba.fastjson.JSONObject(), WebSocketConstants.ALARM_RULE, jsb);
-
-        WebSocketUtils.sendMessage(iotAlarmRule.getIotCode(), JSON.toJSONString(result));
-
+        websocketService.sendMsgByTokens(result, Sets.newHashSet(iotAlarmRule.getIotCode()));
     }
 
     private void receiveAndUpdateAlarmRule(WebsocketExecuteReq req) {
@@ -113,7 +114,7 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
         if (re != null) {
             final List<IotAlarmRuleExpress> list = JSON.parseArray(re.toString(), IotAlarmRuleExpress.class);
             ruleExpressService.delByRuleId(ruleId);
-            if (ObjectUtil.isNotEmpty(list)){
+            if (ObjectUtil.isNotEmpty(list)) {
                 ruleExpressService.saveOrUpdateBatch(list);
             }
         }

+ 5 - 4
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -8,6 +8,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.google.common.collect.Sets;
 import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.domain.iot.domain.IotAlarmDefenceArea;
 import com.xunmei.common.core.domain.iot.domain.IotAlarmSubsystem;
@@ -38,8 +39,8 @@ import com.xunmei.host.websocket.enums.DeviceCacheEnum;
 import com.xunmei.host.websocket.enums.DeviceNetStatusEnum;
 import com.xunmei.host.websocket.enums.ProductEnums;
 import com.xunmei.host.websocket.enums.TopicTypeEnums;
-import com.xunmei.host.websocket.redis.WebsocketPublisher;
 import com.xunmei.host.websocket.service.RouterService;
+import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.host.websocket.utils.IotServerUtils;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
 import com.xunmei.system.api.domain.iot.IotDeviceInfoExtend;
@@ -82,7 +83,7 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
     @Autowired
     private IotDeviceStatusService deviceStatusService;
     @Autowired
-    private WebsocketPublisher websocketPublisher;
+    private WebsocketService websocketService;
 
     /**
      * 根据设备类型和设备编码,获取设备信息
@@ -833,7 +834,7 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         final JSONObject object = new JSONObject();
         object.put("dataType", eto.getDataType());
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.DO_WORK, object);
-        websocketPublisher.publishMsg(iotCode, result);
+        websocketService.sendMsgByTokens(result, Sets.newHashSet(iotCode));
 
     }
 
@@ -861,6 +862,6 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         }
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.CHANGE_DEVICE, eto);
-        websocketPublisher.publishMsg(iotCode, result);
+        websocketService.sendMsgByTokens(result, Sets.newHashSet(iotCode));
     }
 }

+ 2 - 34
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/controller/HostController.java

@@ -63,45 +63,13 @@ public class HostController {
     public AjaxResult getDiskInfos(@RequestBody DiskInfoGetReq req) {
         this.diskDetectionService.getDiskInfos(req);
         return AjaxResult.success();
-    }
-
-
-    @ApiOperation("布撤防控制")
-    @PostMapping({"/subSystemControl"})
-    public AjaxResult subSystemControl(@RequestBody SubSystemControlEto eto) {
-        try {
-            alarmHostService.subSystemControl(eto.getId(), eto.getIsAlarm());
-            return AjaxResult.success();
-        } catch (Exception r) {
-            return AjaxResult.error("布撤防控制指令下发失败!");
-        }
-    }
+    }*/
 
-    @ApiOperation("通断电控制(组织机构)")
-    @PostMapping({"/breakerControlByOrgId"})
-    public AjaxResult breakerControlByOrgId(@RequestBody BreakerControlEto eto) {
-        try {
-            alarmHostService.breakerControlByOrgId(eto.getOrgId(), eto.getCommand());
-            return AjaxResult.success();
-        } catch (Exception r) {
-            return AjaxResult.error("通断电控制指令下发失败!");
-        }
-    }
 
-    @ApiOperation("通断电控制(设备id)")
-    @PostMapping({"/breakerControlByDeviceId"})
-    public AjaxResult breakerControlByDeviceId(@RequestBody BreakerControlEto eto) {
-        try {
-            alarmHostService.breakerControlByDeviceId(eto.getId(), eto.getCommand());
-            return AjaxResult.success();
-        } catch (Exception r) {
-            return AjaxResult.error("通断电控制指令下发失败!");
-        }
-    }*/
 
     @ApiOperation("关闭iot链接")
     @GetMapping({"/closeSession"})
-    public AjaxResult closeSession(String iotCode) throws IOException {
+    public AjaxResult closeSession(String iotCode){
         //集群环境下 此功能不稳地,暂时取消
         //WebSocketSessionHolder.closeServe(iotCode);
         return AjaxResult.success();

+ 21 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/controller/IotWebsocketMsgController.java

@@ -0,0 +1,21 @@
+package com.xunmei.host.north.controller;
+
+
+import org.springframework.web.bind.annotation.RequestMapping;
+
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * <p>
+ *  前端控制器
+ * </p>
+ *
+ * @author jingYuanChao
+ * @since 2024-10-28
+ */
+@RestController
+@RequestMapping("/iotWebsocketMsg")
+public class IotWebsocketMsgController {
+
+}
+

+ 16 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/mapper/IotWebsocketMsgMapper.java

@@ -0,0 +1,16 @@
+package com.xunmei.host.north.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author jingYuanChao
+ * @since 2024-10-28
+ */
+public interface IotWebsocketMsgMapper extends BaseMapper<IotWebsocketMsg> {
+
+}

+ 16 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/IotWebsocketMsgService.java

@@ -0,0 +1,16 @@
+package com.xunmei.host.north.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author jingYuanChao
+ * @since 2024-10-28
+ */
+public interface IotWebsocketMsgService extends IService<IotWebsocketMsg> {
+
+}

+ 20 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java

@@ -0,0 +1,20 @@
+package com.xunmei.host.north.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
+import com.xunmei.host.north.mapper.IotWebsocketMsgMapper;
+import com.xunmei.host.north.service.IotWebsocketMsgService;
+import org.springframework.stereotype.Service;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author jingYuanChao
+ * @since 2024-10-28
+ */
+@Service
+public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMapper, IotWebsocketMsg> implements IotWebsocketMsgService {
+
+}

+ 7 - 78
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/controller/WebsocketController.java

@@ -1,25 +1,13 @@
 package com.xunmei.host.websocket.controller;
 
 import com.xunmei.common.core.web.domain.AjaxResult;
-import com.xunmei.common.redis.utils.RedisUtils;
-import com.xunmei.host.websocket.holder.WebSocketSessionHolder;
-import com.xunmei.host.websocket.redis.WebsocketPublisher;
+import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
-import org.redisson.api.RKeys;
-import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 /**
  * @author gaoxiong
  * @Title:
@@ -32,79 +20,20 @@ import java.util.regex.Pattern;
 public class WebsocketController {
 
     @Autowired
-    private WebsocketPublisher websocketPublisher;
-
-
-
+    private WebsocketService websocketService;
 
-
-    @GetMapping("/send")
-    public String sendMessage(String message) {
-        RedisWebsocketMsg msg = new RedisWebsocketMsg();
-        Map<String,String> map = new HashMap<>();
-        map.put("name","王燕妮");
-        map.put("sex","女");
-        msg.setContent(map);
-        RedissonClient client = RedisUtils.getClient();
-        RKeys keys = client.getKeys();
-        Iterable<String> keyTokens = keys.getKeysByPattern(WebSocketSessionHolder.REDIS_TOPIC_WEBSOCKET_TOKEN + "*");
-        Set<String> tokens = new HashSet<>();
-        for (String keyToken : keyTokens) {
-            String token = RedisUtils.getCacheObject(keyToken);
-            tokens.add(token);
-        }
-        msg.setTokens(tokens);
-        websocketPublisher.sendMessage(msg);
-        return "发送成功";
-    }
-
-    /**
-     * 给所有连接的主机发送命令消息
-     * @param obj
-     * @return
-     */
     @PostMapping("/sendAllMsg")
-    public AjaxResult sendAllMessage(Object obj){
+    public AjaxResult sendAllMessage(Object obj) {
         RedisWebsocketMsg msg = new RedisWebsocketMsg();
-        msg.setContent(msg);
-        RedissonClient client = RedisUtils.getClient();
-        RKeys keys = client.getKeys();
-        Iterable<String> keyTokens = keys.getKeysByPattern(WebSocketSessionHolder.REDIS_TOPIC_WEBSOCKET_TOKEN + "*");
-        Set<String> tokens = new HashSet<>();
-        for (String keyToken : keyTokens) {
-            String token = RedisUtils.getCacheObject(keyToken);
-            tokens.add(token);
-        }
-        msg.setTokens(tokens);
-        websocketPublisher.sendMessage(msg);
+        msg.setContent(obj);
+        websocketService.sendMgsToAll(msg);
         return AjaxResult.success();
     }
 
-
     @PostMapping("/sendListMsg")
-    public AjaxResult sendListMessage(RedisWebsocketMsg msg){
-        websocketPublisher.sendMessage(msg);
+    public AjaxResult sendListMessage(RedisWebsocketMsg msg) {
+        websocketService.sendMgsToAll(msg);
         return AjaxResult.success();
     }
 
-    public static void main(String[] args) {
-        String url = "/things/SmartBulb/LivingRoom/service/invoke/reply";
-
-        // 正则表达式模式
-        Pattern pattern = Pattern.compile("^/things/(\\w+)/(\\w+)/service/invoke$");
-
-        // 创建Matcher对象
-        Matcher matcher = pattern.matcher(url);
-
-        // 检查是否匹配成功
-        if (matcher.find()) {
-            // 提取动态参数
-            String productName = matcher.group(1);
-            String deviceName = matcher.group(2);
-            System.out.println("Product Name: " + productName);
-            System.out.println("Device Name: " + deviceName);
-        } else {
-            System.out.println("URL does not match the expected format.");
-        }
-    }
 }

+ 0 - 48
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/dto/WebsocketResult.java

@@ -5,8 +5,6 @@ import com.alibaba.fastjson.JSONObject;
 import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.constant.HttpStatus;
 import com.xunmei.host.websocket.constant.WebSocketConstants;
-import com.xunmei.host.websocket.enums.TopicTypeEnums;
-import com.xunmei.system.api.dto.protection.ReceiveErrorDto;
 import io.netty.util.internal.StringUtil;
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -14,8 +12,6 @@ import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * @author gaoxiong
@@ -53,51 +49,7 @@ public class WebsocketResult implements Serializable {
     private Object payload;
 
 
-    public String invokeTopic(String productName, String deviceName) {
-        return String.format("things/%s/%s/service/invoke", productName, deviceName);
-    }
-
-    public static WebsocketResult of(String topic, String id) {
-        WebsocketResult result = new WebsocketResult();
-        result.setId(id);
-        result.setTopic(topic);
-        result.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
-        Map<String, Object> map = new HashMap<>();
-        map.put(WebSocketConstants.STATUS_CODE, HttpStatus.SUCCESS);
-        map.put(WebSocketConstants.STATUS_DESCRIPTION, StringUtil.EMPTY_STRING);
-        map.put(WebSocketConstants.HEADER, new JSONObject());
-        result.setPayload(map);
-        return result;
-    }
-
-    public static WebsocketResult of(ReceiveErrorDto dto, String topic, String id) {
-        WebsocketResult result = new WebsocketResult();
-        result.setId(id);
-        result.setTopic(topic);
-        result.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
-        if (dto == null) {
-            return result;
-        }
-        Map<String, Object> map = new HashMap<>();
-        map.put(WebSocketConstants.STATUS_CODE, dto.getSuccess() ? HttpStatus.SUCCESS : HttpStatus.ERROR);
-        map.put(WebSocketConstants.STATUS_DESCRIPTION, dto.getErrorMsg());
-        map.put(WebSocketConstants.HEADER, new JSONObject());
-        result.setPayload(map);
-        return result;
-    }
 
-    public static WebsocketResult of(String topic, String id,boolean success,String msg) {
-        WebsocketResult result = new WebsocketResult();
-        result.setId(id);
-        result.setTopic(topic);
-        result.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
-        Map<String, Object> map = new HashMap<>();
-        map.put(WebSocketConstants.STATUS_CODE,success ? HttpStatus.SUCCESS : HttpStatus.ERROR);
-        map.put(WebSocketConstants.STATUS_DESCRIPTION, msg);
-        map.put(WebSocketConstants.HEADER, new JSONObject());
-        result.setPayload(map);
-        return result;
-    }
     public static WebsocketResult replySuccess(String topic, String id,String productName,String deviceName) {
         WebsocketResult result = new WebsocketResult();
         result.setId(id);

+ 1 - 21
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketPublisher.java

@@ -1,6 +1,5 @@
 package com.xunmei.host.websocket.redis;
 
-import com.google.common.collect.Sets;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import lombok.extern.slf4j.Slf4j;
@@ -9,8 +8,6 @@ import org.redisson.api.RedissonClient;
 import org.redisson.codec.SerializationCodec;
 import org.springframework.stereotype.Component;
 
-import java.util.HashSet;
-
 /**
  * websocket 消息发布者
  * @author gaoxiong
@@ -33,27 +30,10 @@ public class WebsocketPublisher {
             //消息发布
             clientTopic.publishAsync(message);
             log.info("websocket 发布消息:{}",message);
-            long l = clientTopic.countSubscribers();
-            log.info("发送消息:{}",l);
+            log.info("发送消息,存在订阅者数量:{}",clientTopic.countSubscribers());
         }catch (Exception e){
             e.printStackTrace();
         }
 
     }
-
-
-    //广播的消息
-    public void publishAllMsg(Object obj){
-        RedisWebsocketMsg message =new RedisWebsocketMsg();
-        message.setContent(obj);
-        message.setTokens(new HashSet<>());
-        sendMessage(message);
-    }
-    //指定主机的消息
-    public void publishMsg(String iotCode,Object obj){
-        RedisWebsocketMsg message =new RedisWebsocketMsg();
-        message.setContent(obj);
-        message.setTokens(Sets.newHashSet(iotCode));
-        sendMessage(message);
-    }
 }

+ 21 - 22
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java

@@ -4,7 +4,6 @@ import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.host.websocket.holder.WebSocketSessionHolder;
-import com.xunmei.host.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RTopic;
@@ -44,31 +43,31 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
         clientTopic.addListener(RedisWebsocketMsg.class, new MessageListener<RedisWebsocketMsg>() {
             @Override
             public void onMessage(CharSequence channel, RedisWebsocketMsg msg) {
-                String str = JSON.toJSONString(msg.getContent());
-                log.info("接收到订阅消息:{}",JSON.toJSONString(msg));
+                String contentJson = JSON.toJSONString(msg.getContent());
+                log.info("host-websocket 接收到订阅消息:{}", JSON.toJSONString(msg));
                 Set<String> tokens = msg.getTokens();
-                if(ObjectUtil.isNotEmpty(tokens)){
-                    for (String token : tokens) {
-                        WebSocketSession sessions = WebSocketSessionHolder.getSessions(token);
-                        if(sessions != null){
-                            WebSocketUtils.sendMessage(sessions,str);
-                        }
+                sendMessage(tokens, contentJson);
+            }
+        });
+    }
 
-                    }
-                }else {
-                    for (String token : WebSocketSessionHolder.getSessionsAll()) {
-                        final WebSocketSession sessions = WebSocketSessionHolder.getSessions(token);
-                        try {
-                            sessions.sendMessage(new TextMessage(str));
-                        } catch (IOException e) {
-                            log.error("消息广播失败:{}",e);
-                            throw new RuntimeException(e);
-                        }
-                    }
+    private void sendMessage(Set<String> tokens, String message) {
+        if (ObjectUtil.isEmpty(tokens)) {
+            tokens = WebSocketSessionHolder.getSessionsAll();
+        }
+        if (ObjectUtil.isEmpty(tokens)) {
+            return;
+        }
+        for (String token : tokens) {
+            WebSocketSession session = WebSocketSessionHolder.getSessions(token);
+            if (session != null && session.isOpen()) {
+                try {
+                    session.sendMessage(new TextMessage(message));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
                 }
-
             }
-        });
+        }
     }
 
     @Override

+ 2 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/WebsocketService.java

@@ -4,9 +4,9 @@ package com.xunmei.host.websocket.service;
 import java.util.Set;
 
 public interface WebsocketService {
-    void sendListMessage(Object obj, Set<String> tokens) throws Exception;
+    void sendMsgByTokens(Object obj, Set<String> tokens);
 
-    void sendAllMessage(Object obj) throws Exception;
+    void sendMgsToAll(Object obj);
 
     void getDevices();
 

+ 6 - 34
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java

@@ -2,15 +2,10 @@ package com.xunmei.host.websocket.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.xunmei.common.redis.utils.RedisUtils;
-import com.xunmei.host.iot.service.IIotDeviceInfoExtendService;
-import com.xunmei.host.iot.service.IIotDeviceInfoService;
-import com.xunmei.host.protection.service.IotAlarmDefenceAreaService;
-import com.xunmei.host.protection.service.IotAlarmSubsystemService;
-import com.xunmei.host.server.service.IotServerInfoService;
-import com.xunmei.host.video.service.IotDvrDiskService;
 import com.xunmei.host.websocket.constant.WebSocketConstants;
 import com.xunmei.host.websocket.dto.WebsocketResult;
 import com.xunmei.host.websocket.enums.TopicTypeEnums;
+import com.xunmei.host.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.host.websocket.redis.WebsocketPublisher;
 import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.host.websocket.utils.IotServerUtils;
@@ -31,35 +26,12 @@ public class WebsocketServiceImpl implements WebsocketService {
     private static final Logger log = LoggerFactory.getLogger(WebsocketServiceImpl.class);
     @Autowired
     private WebsocketPublisher websocketPublisher;
-    @Autowired
-    private IotServerInfoService iotServerInfoService;
-    @Autowired
-    private IIotDeviceInfoService iIotDeviceInfoService;
-    @Autowired
-    private IIotDeviceInfoExtendService iIotDeviceInfoExtendService;
-    @Autowired
-    private IotDvrDiskService iIotDvrDiskService;
-
-    @Autowired
-    private IotAlarmDefenceAreaService iotAlarmDefenceAreaService;
-    @Autowired
-    private IotAlarmSubsystemService iotAlarmSubsystemService;
-
-
-    private static final String LOCK_DVS_BASE_INFO = "websocket:lock:dvs_base_info";
-
-    private static final String LOCK_DEVICE_BASE_INFO = "websocket:lock:device_base_info";
-
-    private static final String LOCK_ALARM_HOST_BASE_INFO = "websocket:lock:alarmHost_base_info";
-
-    //获取基础数据topic
-    private static final String iotServerDeviceTopic = "/things/IoTServer/IoTServerDevice/service/invoke";
 
     @Override
     public void getDevices() {
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(),WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = IotServerUtils.invokeHostServer(topic, new JSONObject(), WebSocketConstants.GET_DEVICES_SERVICES, new JSONObject());
-        websocketPublisher.publishAllMsg(result);
+        sendMgsToAll(result);
     }
 
 
@@ -69,7 +41,7 @@ public class WebsocketServiceImpl implements WebsocketService {
      * @param obj
      * @param tokens
      */
-    public void sendListMessage(Object obj, Set<String> tokens) {
+    public void sendMsgByTokens(Object obj, Set<String> tokens) {
         RedisWebsocketMsg msg = new RedisWebsocketMsg();
         msg.setTokens(tokens);
         msg.setContent(obj);
@@ -82,10 +54,10 @@ public class WebsocketServiceImpl implements WebsocketService {
      * @param obj
      * @throws Exception
      */
-    public void sendAllMessage(Object obj) throws Exception {
+    public void sendMgsToAll(Object obj) {
         RedissonClient client = RedisUtils.getClient();
         RKeys keys = client.getKeys();
-        Iterable<String> keyTokens = keys.getKeysByPattern("websocket:token_*");
+        Iterable<String> keyTokens = keys.getKeysByPattern(WebSocketSessionHolder.REDIS_TOPIC_WEBSOCKET_TOKEN + "*");
         Set<String> tokens = new HashSet();
         Iterator kts = keyTokens.iterator();
         while (kts.hasNext()) {
@@ -93,7 +65,7 @@ public class WebsocketServiceImpl implements WebsocketService {
             String token = (String) RedisUtils.getCacheObject(keyToken);
             tokens.add(token);
         }
-        sendListMessage(obj, tokens);
+        sendMsgByTokens(obj, tokens);
     }
 
 

+ 0 - 44
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/utils/IotServerUtils.java

@@ -33,50 +33,6 @@ public class IotServerUtils {
         return websocketResult;
     }
 
-    /**
-     * 构建主动调用主机service的消息
-     * @param topicTypeEnums
-     * @param produceName
-     * @param deviceName
-     * @param invokeMethod
-     * @param object
-     * @return
-     */
-    public static WebsocketResult invokeDownLinkServer(TopicTypeEnums topicTypeEnums, String produceName, String deviceName, String invokeMethod, JSONObject object){
-        Date date = new Date();
-        String id = UUID.randomUUID().toString();
-        WebsocketResult iotWebsocketResult = new WebsocketResult();
-        //Iot消息透穿至主机 固定Topic
-        iotWebsocketResult.setTopic(TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.IOT_SERVER,WebSocketConstants.IOT_SERVER_DEVICE));
-        iotWebsocketResult.setId(id);
-        iotWebsocketResult.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
-        //iot消息头
-        JSONObject iotHeaderObj = new JSONObject();
-        iotHeaderObj.put(WebSocketConstants.PRODUCT_NAME,WebSocketConstants.IOT_SERVER);
-        iotHeaderObj.put(WebSocketConstants.DEVICE_NAME, WebSocketConstants.IOT_SERVER_DEVICE);
-        iotWebsocketResult.setHeaders(iotHeaderObj);
-        //iot消息体,完整的消息包含在payload中
-        JSONObject iotPayloadObj = new JSONObject();
-        iotWebsocketResult.setPayload(iotPayloadObj);
-        iotPayloadObj.put(WebSocketConstants.SERVICE, WebSocketConstants.DOWN_LINK_SERVICE_PASS_THROUGH);
-        //以下是主机消息内容
-        WebsocketResult hostData = new WebsocketResult();
-        iotPayloadObj.put(WebSocketConstants.ARGS,hostData );
-        hostData.setId(id);
-        hostData.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
-        hostData.setTopic(TopicTypeEnums.formatUrl(topicTypeEnums.getUrl(), produceName,deviceName));
-        JSONObject hostHeaderObj = new JSONObject();
-        hostHeaderObj.put(WebSocketConstants.PRODUCT_NAME,produceName);
-        hostHeaderObj.put(WebSocketConstants.DEVICE_NAME, deviceName);
-        hostData.setHeaders(hostHeaderObj);
-
-        JSONObject hostPayloadObj = new JSONObject();
-        hostPayloadObj.put(WebSocketConstants.SERVICE, invokeMethod);
-        hostPayloadObj.put(WebSocketConstants.ARGS, object);
-
-        hostData.setPayload(hostPayloadObj);
-        return iotWebsocketResult;
-    }
 
     /**
      * 接受主机上报事件后,构建返回的消息

+ 5 - 0
soc-modules/soc-modules-host/src/main/resources/mapper/IotWebsocketMsgMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.xunmei.host.north.mapper.IotWebsocketMsgMapper">
+
+</mapper>