Jelajahi Sumber

消息回复代码部分提交

jingyuanchao 1 tahun lalu
induk
melakukan
3adc40df75
24 mengubah file dengan 375 tambahan dan 152 penghapusan
  1. 22 0
      project_data/sql/1.0.11/soc.sql
  2. 1 3
      soc-api/soc-api-system/src/main/java/com/xunmei/system/api/domain/websocket/RedisWebsocketMsg.java
  3. 12 0
      soc-common/soc-common-core/src/main/java/com/xunmei/common/core/constant/RedisConstant.java
  4. 27 8
      soc-common/soc-common-core/src/main/java/com/xunmei/common/core/domain/iot/domain/IotWebsocketMsg.java
  5. 22 0
      soc-common/soc-common-core/src/main/java/com/xunmei/common/core/enums/iot/MessageStatusEnum.java
  6. 1 1
      soc-common/soc-common-core/src/main/java/com/xunmei/common/core/thread/ThreadPoolConfig.java
  7. 1 1
      soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/enums/RedisDelayQueueEnum.java
  8. 1 2
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/alarm/service/impl/IotAlarmRuleServiceImpl.java
  9. 2 3
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java
  10. 9 1
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/IotWebsocketMsgService.java
  11. 122 2
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java
  12. 2 2
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/controller/WebsocketController.java
  13. 5 5
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/dto/WebsocketResult.java
  14. 50 35
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java
  15. 24 17
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java
  16. 4 4
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayQueueRunner.java
  17. 30 20
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayedQueueUtil.java
  18. 1 3
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/WebsocketService.java
  19. 4 4
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java
  20. 15 7
      soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/utils/WebSocketUtils.java
  21. 10 15
      soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/controller/WebsocketController.java
  22. 7 13
      soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/redis/WebsocketSubscriber.java
  23. 1 2
      soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/WebsocketService.java
  24. 2 4
      soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/impl/WebsocketServiceImpl.java

+ 22 - 0
project_data/sql/1.0.11/soc.sql

@@ -0,0 +1,22 @@
+
+drop table if exists `iot_websocket_msg`;
+CREATE TABLE `iot_websocket_msg` (
+         `id` varchar(125) COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息id',
+         `event` varchar(125) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '事件类型',
+         `org_id` bigint DEFAULT NULL COMMENT '机构id',
+         `org_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '机构名称',
+         `iot_code` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '目标主机',
+         `server_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '主机名称',
+         `status` int DEFAULT NULL COMMENT '消息状态;0:处理中,1:处理完成,2:重试中,3:重试成功,4:重试失败',
+         `receive_content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '收到消息内容',
+         `reply_content` text COLLATE utf8mb4_general_ci COMMENT '回复内容',
+         `ready_send_time` datetime DEFAULT NULL COMMENT '准备发送时间',
+         `really_send_time` datetime DEFAULT NULL COMMENT '重试成功时间',
+         `fire_time` datetime DEFAULT NULL COMMENT '触发时间',
+         `max_retry_times` int DEFAULT NULL COMMENT '最大重试次数',
+         `cur_retry_times` int DEFAULT NULL COMMENT '当前重试次数',
+         `retry_interval` int DEFAULT NULL COMMENT '重试间隔,分钟',
+         `create_time` datetime DEFAULT NULL COMMENT '创建时间',
+         `update_time` datetime DEFAULT NULL COMMENT '最后一次修改时间',
+         PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

+ 1 - 3
soc-api/soc-api-system/src/main/java/com/xunmei/system/api/domain/websocket/RedisWebsocketMsg.java

@@ -5,8 +5,6 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
 
 /**
  * @author gaoxiong
@@ -30,7 +28,7 @@ public class RedisWebsocketMsg implements Serializable {
     /**
      * 可以发送的消息的session token
      */
-    private Set<String> tokens = new HashSet<>();
+    private String tokens;
 
 
 }

+ 12 - 0
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/constant/RedisConstant.java

@@ -0,0 +1,12 @@
+package com.xunmei.common.core.constant;
+
+/**
+ * @author jingyuanchao
+ * @date 2024/10/30 15:27
+ */
+public class RedisConstant {
+
+
+    // host模块主机发送失败消息重试队列名称
+    public static final String WS_RETRY_MSG_QUEUE_KEY = "ws_retry_msg_queue";
+}

+ 27 - 8
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/domain/iot/domain/IotWebsocketMsg.java

@@ -35,6 +35,14 @@ public class IotWebsocketMsg implements Serializable {
     @TableField("event")
     private String event;
 
+    @ApiModelProperty(value = "机构id")
+    @TableField("org_id")
+    private Long orgId;
+
+    @ApiModelProperty(value = "机构名称")
+    @TableField("org_name")
+    private String orgName;
+
     @ApiModelProperty(value = "目标主机")
     @TableField("iot_code")
     private String iotCode;
@@ -42,14 +50,20 @@ public class IotWebsocketMsg implements Serializable {
     @ApiModelProperty(value = "主机名称")
     @TableField("server_name")
     private String serverName;
+    /**
+     * @see com.xunmei.common.core.enums.iot.MessageStatusEnum
+     */
+    @ApiModelProperty(value = "消息状态;0:处理中,1:处理完成,2:重试中,3:重试成功,4:重试失败")
+    @TableField("status")
+    private Integer status;
 
-    @ApiModelProperty(value = "是否处理,0:未处理,1:已处理完成")
-    @TableField("deal")
-    private Integer deal;
+    @ApiModelProperty(value = "收到消息内容")
+    @TableField("receive_content")
+    private String receiveContent;
 
-    @ApiModelProperty(value = "消息内容")
-    @TableField("content")
-    private String content;
+    @ApiModelProperty(value = "回复内容")
+    @TableField("reply_content")
+    private String replyContent;
 
     @ApiModelProperty(value = "准备发送时间")
     @TableField("ready_send_time")
@@ -59,7 +73,7 @@ public class IotWebsocketMsg implements Serializable {
     @TableField("really_send_time")
     private LocalDateTime reallySendTime;
 
-    @ApiModelProperty(value = "触发时间,即第一次重试时间")
+    @ApiModelProperty(value = "触发时间")
     @TableField("fire_time")
     private LocalDateTime fireTime;
 
@@ -75,6 +89,11 @@ public class IotWebsocketMsg implements Serializable {
     @ApiModelProperty(value = "重试间隔,分钟")
     private Integer retryInterval;
 
+    @ApiModelProperty(value = "创建时间")
+    @TableField("create_time")
+    private LocalDateTime createTime;
 
-
+    @ApiModelProperty(value = "最后一次修改时间")
+    @TableField("update_time")
+    private LocalDateTime updateTime;
 }

+ 22 - 0
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/enums/iot/MessageStatusEnum.java

@@ -0,0 +1,22 @@
+package com.xunmei.common.core.enums.iot;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * @author jingyuanchao
+ * @date 2024/10/30 14:09
+ */
+
+@Getter
+@AllArgsConstructor
+public enum MessageStatusEnum {
+
+    PROCESSING(0, "处理中"),
+    SUCCESS(1, "处理完成"),
+    RETRYING(2, "重试中"),
+    RETRY_SUCCESS(3, "重试成功"),
+    RETRY_FAIL(4, "重试失败");
+    private final int code;
+    private final String description;
+}

+ 1 - 1
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/thread/ThreadPoolConfig.java

@@ -17,7 +17,7 @@ public class ThreadPoolConfig implements AsyncConfigurer {
      * 项目共用线程池
      */
     public static final String SOC_EXECUTOR = "socExecutor";
-    public static final String HOST_EXECUTOR = "mediatorExecutor";
+    public static final String HOST_EXECUTOR = "hostExecutor";
 
 
     @Override

+ 1 - 1
soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/enums/RedisDelayQueueEnum.java

@@ -13,7 +13,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor
 public enum RedisDelayQueueEnum {
 
-    WEBSOCKET_MSG_RETRY("WEBSOCKET_MSG_RETRY","websocket消息重试队列", "iotWebsocketMsgService"),
+    WEBSOCKET_MSG_RETRY("WEBSOCKET_MSG_RETRY","websocket消息重试队列", "IotWebsocketMsgService"),
 
     ;
 

+ 1 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/alarm/service/impl/IotAlarmRuleServiceImpl.java

@@ -4,7 +4,6 @@ import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.google.common.collect.Sets;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmRule;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmRuleExpress;
@@ -76,7 +75,7 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
 
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), ProductEnums.DETECTION_HOST.getProductName().get(0), ProductEnums.DETECTION_HOST.getProductName().get(1));
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new com.alibaba.fastjson.JSONObject(), WebSocketConstants.ALARM_RULE, jsb);
-        websocketService.sendMsgByTokens(result, Sets.newHashSet(iotAlarmRule.getIotCode()));
+        websocketService.sendMsgByTokens(result, iotAlarmRule.getIotCode());
     }
 
     private JSONObject receiveAndUpdateAlarmRule(WebsocketExecuteReq req) {

+ 2 - 3
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -8,7 +8,6 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.google.common.collect.Sets;
 import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.constant.ErrorMsgConstants;
 import com.xunmei.common.core.domain.iot.domain.IotAlarmDefenceArea;
@@ -836,7 +835,7 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         final JSONObject object = new JSONObject();
         object.put("dataType", eto.getDataType());
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new JSONObject(), WebSocketConstants.DO_WORK, object);
-        websocketService.sendMsgByTokens(result, Sets.newHashSet(iotCode));
+        websocketService.sendMsgByTokens(result, iotCode);
 
     }
 
@@ -864,6 +863,6 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         }
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new JSONObject(), WebSocketConstants.CHANGE_DEVICE, eto);
-        websocketService.sendMsgByTokens(result, Sets.newHashSet(iotCode));
+        websocketService.sendMsgByTokens(result, iotCode);
     }
 }

+ 9 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/IotWebsocketMsgService.java

@@ -2,10 +2,12 @@ package com.xunmei.host.north.service;
 
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
+import com.xunmei.host.websocket.dto.WebsocketExecuteReq;
+import com.xunmei.host.websocket.dto.WebsocketResult;
 
 /**
  * <p>
- *  服务类
+ * 服务类
  * </p>
  *
  * @author jingYuanChao
@@ -13,4 +15,10 @@ import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
  */
 public interface IotWebsocketMsgService extends IService<IotWebsocketMsg> {
 
+
+    void saveMsg(WebsocketExecuteReq req,String receiveMsg);
+    void sendSuccessMsg(WebsocketResult res, String msgId);
+    void sendSuccessMsg(String msgId);
+    void sendFailMsg(WebsocketResult res, String msgId);
+
 }

+ 122 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java

@@ -1,20 +1,140 @@
 package com.xunmei.host.north.service.impl;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
 import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
+import com.xunmei.common.core.enums.iot.MessageStatusEnum;
+import com.xunmei.common.core.thread.ThreadPoolConfig;
+import com.xunmei.common.core.utils.JacksonUtils;
+import com.xunmei.common.redis.enums.RedisDelayQueueEnum;
 import com.xunmei.host.north.mapper.IotWebsocketMsgMapper;
 import com.xunmei.host.north.service.IotWebsocketMsgService;
+import com.xunmei.host.websocket.dto.WebsocketExecuteReq;
+import com.xunmei.host.websocket.dto.WebsocketResult;
+import com.xunmei.host.websocket.redis.delay.RedisDelayQueueHandle;
+import com.xunmei.host.websocket.redis.delay.RedisDelayedQueueUtil;
+import com.xunmei.host.websocket.service.WebsocketService;
+import com.xunmei.system.api.util.LogUtils;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.time.LocalDateTime;
 
 /**
  * <p>
- *  服务实现类
+ * 服务实现类
  * </p>
  *
  * @author jingYuanChao
  * @since 2024-10-28
  */
 @Service
-public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMapper, IotWebsocketMsg> implements IotWebsocketMsgService {
+public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMapper, IotWebsocketMsg> implements IotWebsocketMsgService, RedisDelayQueueHandle {
+
+    @Resource
+    WebsocketService websocketService;
+
+    @Override
+    @Async(ThreadPoolConfig.HOST_EXECUTOR)
+    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
+    public void saveMsg(WebsocketExecuteReq req, String receiveMsg) {
+
+        final IotServerInfo serverInfo = req.getServerInfo();
+        final IotWebsocketMsg msg = new IotWebsocketMsg();
+        msg.setId(req.getId());
+        msg.setEvent(req.getEvent());
+        msg.setOrgId(serverInfo.getOrgId());
+        msg.setOrgName(serverInfo.getOrgName());
+        msg.setIotCode(serverInfo.getIotCode());
+        msg.setServerName(serverInfo.getIotName());
+        msg.setStatus(MessageStatusEnum.PROCESSING.getCode());
+        msg.setReceiveContent(receiveMsg);
+        msg.setMaxRetryTimes(3);
+        msg.setCurRetryTimes(0);
+        msg.setRetryInterval(3);
+        msg.setCreateTime(LocalDateTime.now());
+        msg.setUpdateTime(LocalDateTime.now());
+        save(msg);
+        LogUtils.WS_MSG_RETRY_LOG.info("收到主机消息,msg: {}, Thread-Name: {}", JacksonUtils.toJSONString(msg), Thread.currentThread().getName());
+    }
+
+    @Override
+    @Async(ThreadPoolConfig.HOST_EXECUTOR)
+    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
+    public void sendSuccessMsg(String msgId) {
+        final IotWebsocketMsg websocketMsg = getById(msgId);
+        if (websocketMsg == null) {
+            return;
+        }
+        websocketMsg.setStatus(MessageStatusEnum.SUCCESS.getCode());
+        if (websocketMsg.getReadySendTime()==null){
+            websocketMsg.setReadySendTime(LocalDateTime.now());
+        }
+        websocketMsg.setReallySendTime(LocalDateTime.now());
+        websocketMsg.setUpdateTime(LocalDateTime.now());
+        updateById(websocketMsg);
+        LogUtils.WS_MSG_RETRY_LOG.info("主机消息回复成功,msg: {}", JacksonUtils.toJSONString(websocketMsg));
+    }
+
+    @Override
+    @Async(ThreadPoolConfig.HOST_EXECUTOR)
+    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
+    public void sendSuccessMsg(WebsocketResult res, String msgId) {
+        final IotWebsocketMsg websocketMsg = getById(msgId);
+        if (websocketMsg == null) {
+            return;
+        }
+        websocketMsg.setStatus(MessageStatusEnum.SUCCESS.getCode());
+        websocketMsg.setReplyContent(JacksonUtils.toJSONString(res));
+        websocketMsg.setReadySendTime(LocalDateTime.now());
+        websocketMsg.setReallySendTime(LocalDateTime.now());
+        websocketMsg.setUpdateTime(LocalDateTime.now());
+        updateById(websocketMsg);
+        LogUtils.WS_MSG_RETRY_LOG.info("主机消息回复成功,msg: {}", JacksonUtils.toJSONString(websocketMsg));
+    }
+
+    @Override
+    @Async(ThreadPoolConfig.HOST_EXECUTOR)
+    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
+    public void sendFailMsg(WebsocketResult res, String msgId) {
+        //收到消息处理完成后第一次发送,但是发送失败了
+        final IotWebsocketMsg websocketMsg = getById(msgId);
+        if (websocketMsg == null) {
+            return;
+        }
+        final LocalDateTime now = LocalDateTime.now();
+        websocketMsg.setReplyContent(JacksonUtils.toJSONString(res));
+        websocketMsg.setStatus(MessageStatusEnum.RETRYING.getCode());
+        websocketMsg.setReadySendTime(now);
+        websocketMsg.setFireTime(now.plusMinutes(websocketMsg.getRetryInterval()));
+        websocketMsg.setUpdateTime(now);
+        updateById(websocketMsg);
+        RedisDelayedQueueUtil.addDelayQueue(msgId, websocketMsg.getFireTime(), RedisDelayQueueEnum.WEBSOCKET_MSG_RETRY.getCode());
+        LogUtils.WS_MSG_RETRY_LOG.info("主机消息回复失败,已添加延迟队列,等待重试,消息内容: {}", JacksonUtils.toJSONString(websocketMsg));
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
+    public void execute(Object o) {
+        final String msgId = String.valueOf(o);
+        final IotWebsocketMsg msg = getById(msgId);
+        if (msg == null) {
+            LogUtils.WS_MSG_RETRY_LOG.error("开始重试Ws消息,但消息不存在,msgId: {}", msgId);
+            return;
+        }
+        final Integer maxRetryTimes = msg.getMaxRetryTimes();
+        final Integer curRetryTimes = msg.getCurRetryTimes();
+        if (curRetryTimes < maxRetryTimes) {
+            websocketService.sendMsgByTokens(msg, msg.getIotCode());
+            msg.setCurRetryTimes(curRetryTimes + 1);
+            updateById(msg);
+            LogUtils.WS_MSG_RETRY_LOG.error("开始第{}次重试Ws消息,msgId: {}", msg.getCurRetryTimes(), msgId);
+            return;
+        }
+
 
+    }
 }

+ 2 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/controller/WebsocketController.java

@@ -26,13 +26,13 @@ public class WebsocketController {
     public AjaxResult sendAllMessage(Object obj) {
         RedisWebsocketMsg msg = new RedisWebsocketMsg();
         msg.setContent(obj);
-        websocketService.sendMgsToAll(msg);
+       // websocketService.sendMgsToAll(msg);
         return AjaxResult.success();
     }
 
     @PostMapping("/sendListMsg")
     public AjaxResult sendListMessage(RedisWebsocketMsg msg) {
-        websocketService.sendMgsToAll(msg);
+        //websocketService.sendMgsToAll(msg);
         return AjaxResult.success();
     }
 

+ 5 - 5
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/dto/WebsocketResult.java

@@ -68,7 +68,7 @@ public class WebsocketResult implements Serializable {
     public static WebsocketResult createWebsocketResult(String topic, String id, String productName, String deviceName, int statusCode, String statusDescription) {
         WebsocketResult result = new WebsocketResult();
         result.setId(id);
-        result.setTopic(topic.contains("reply") ? topic : String.format("things/%s/%s/service/invoke/reply", productName, deviceName));
+        result.setTopic(topic.contains("reply") ? topic : topic + "/reply");
         result.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
 
         JSONObject payload = new JSONObject();
@@ -91,14 +91,14 @@ public class WebsocketResult implements Serializable {
         return payload;
     }
 
-    public static JSONObject getErrorPayload( String statusDescription) {
+    public static JSONObject getErrorPayload(String statusDescription) {
         JSONObject payload = new JSONObject();
         payload.put(WebSocketConstants.STATUS_CODE, HttpStatus.ERROR);
         payload.put(WebSocketConstants.STATUS_DESCRIPTION, statusDescription);
         return payload;
     }
 
-    public static WebsocketResult invokeHostServer(String topic, JSONObject headers, String service, Object args){
+    public static WebsocketResult invokeHostServer(String topic, JSONObject headers, String service, Object args) {
         WebsocketResult websocketResult = new WebsocketResult();
         websocketResult.setId(UUID.randomUUID().toString());
         websocketResult.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
@@ -113,10 +113,10 @@ public class WebsocketResult implements Serializable {
 
     public static JSONObject dealReceiveErrorDto(ReceiveErrorDto dto) {
         final JSONObject object = new JSONObject();
-        if (dto.getSuccess()){
+        if (dto.getSuccess()) {
             object.put(WebSocketConstants.STATUS_CODE, HttpStatus.SUCCESS);
             object.put(WebSocketConstants.STATUS_DESCRIPTION, dto.getErrorMsg());
-        }else {
+        } else {
             object.put(WebSocketConstants.STATUS_CODE, HttpStatus.ERROR);
             object.put(WebSocketConstants.STATUS_DESCRIPTION, dto.getErrorMsg());
         }

+ 50 - 35
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java

@@ -10,6 +10,7 @@ import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
 import com.xunmei.common.core.enums.iot.IotServerConnectStatus;
 import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.core.utils.StringUtils;
+import com.xunmei.host.north.service.IotWebsocketMsgService;
 import com.xunmei.host.server.service.IotServerInfoService;
 import com.xunmei.host.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.host.websocket.dto.WebsocketPayloadResolve;
@@ -18,7 +19,6 @@ import com.xunmei.host.websocket.enums.TopicTypeEnums;
 import com.xunmei.host.websocket.enums.WebsocketStatus;
 import com.xunmei.host.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.host.websocket.service.RouterService;
-import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.host.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.RemoteOrgService;
 import com.xunmei.system.api.domain.SysOrg;
@@ -54,7 +54,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
     private RemoteOrgService orgService;
 
     @Autowired
-    private WebsocketService websocketService;
+    private IotWebsocketMsgService websocketMsgService;
 
     /**
      * 连接成功后
@@ -155,68 +155,83 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
     protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
         // 从WebSocket会话中获取登录用户信息
         String token = WebSocketSessionHolder.updateToken(session);
-        //String token = "f1418fd8-64c3-4b9f-91b1-9bbe76b0314c";
         if (StringUtils.isEmpty(token)) {
-            LogUtils.WEBSOCKET_MSG.error("会话消息不存在,ip:{}", WebSocketUtils.getIp(session));
+            logErrorAndReturn("会话消息不存在", session);
             return;
         }
+
         String payload = message.getPayload();
         final String ip = WebSocketUtils.getIp(session);
-        LogUtils.WEBSOCKET_MSG.info("ip:{},接收到消息:{}", ip, message.getPayload());
-        if (payload.isEmpty()) {
-            LogUtils.WEBSOCKET_MSG.error("消息内容为空,ip:{}", ip);
+        LogUtils.WEBSOCKET_MSG.info("ip:{},接收到消息:{}", ip, payload);
+        if (StringUtils.isEmpty(payload)) {
+            logErrorAndReturn("消息内容为空", ip);
             return;
         }
+
+        WebsocketResult websocketResult;
         try {
-            WebsocketResult websocketResult = JSON.parseObject(payload, WebsocketResult.class);
+            websocketResult = JSON.parseObject(payload, WebsocketResult.class);
             Object obj = websocketResult.getPayload();
             if (ObjectUtil.isEmpty(obj)) {
-                LogUtils.WEBSOCKET_MSG.error("消息内容为空,ip:{}", ip);
+                logErrorAndReturn("消息内容为空", ip);
                 return;
             }
+
             WebsocketPayloadResolve payloadResolve = new WebsocketPayloadResolve(websocketResult);
             String topic = payloadResolve.getTopic();
             TopicTypeEnums typeEnums = TopicTypeEnums.matcherTopicTypeEnums(topic);
             if (typeEnums == null) {
-                LogUtils.WEBSOCKET_MSG.error("消息topic错误,ip:{},topic:{}", ip, topic);
+                logErrorAndReturn("消息topic错误", ip, topic);
                 return;
             }
+
             IotServerInfo serverInfo = iotServerInfoService.selectByToken(token);
             if (ObjectUtil.isEmpty(serverInfo)) {
-                LogUtils.WEBSOCKET_MSG.error("服务器信息不存在或链接信息已不可用,token:{}", token);
+                logErrorAndReturn("服务器信息不存在或链接信息已不可用", token);
                 return;
             }
+
             //上报消息内容
             final Object args = payloadResolve.getData();
             WebsocketExecuteReq executeReq = new WebsocketExecuteReq(payloadResolve.getRoutingKey(), args, serverInfo, websocketResult.getId(), topic, payloadResolve.getHeader(), typeEnums.getProductName(), typeEnums.getDeviceName());
-            WebsocketResult result = null;
-            switch (typeEnums) {
-
-                case PRODUCT_EVENT_NOTICE:
-                    //IoT返回服务调用消息
-                case PRODUCT_SERVICE_REPLY:
-                    RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(), payloadResolve.getRoutingKey());
-                    if (routeService == null) {
-                        WebSocketUtils.sendMessage(session, WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC));
-                        return;
-                    }
-                    try {
-                        result = routeService.execute(executeReq);
-                    } catch (Exception e) {
-                        LogUtils.WEBSOCKET_MSG.error("消息处理失败,ip:{},异常内容:{},消息id:{}", ip, e,executeReq.getId());
-                        result = WebsocketResult.replyError(executeReq, e.getMessage());
-                    }
-                    break;
-                default:
-                    WebSocketUtils.sendMessage(session, WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC));
-                    break;
-            }
-            WebSocketUtils.sendMessage(session, result);
+            handleRequestBasedOnType(executeReq, typeEnums, session, payload);
         } catch (Exception e) {
-            LogUtils.WEBSOCKET_MSG.error("消息处理失败,ip:{},异常内容:{}", ip, e);
+            logErrorAndReturn("消息处理失败", ip, e);
         }
     }
 
+    private void logErrorAndReturn(String message, Object... args) {
+        LogUtils.WEBSOCKET_MSG.error(message + ",ip:{}", args);
+    }
+
+
+    private void handleRequestBasedOnType(WebsocketExecuteReq executeReq, TopicTypeEnums typeEnums, WebSocketSession session, String payload) {
+        final String iotCode = executeReq.getServerInfo().getIotCode();
+        WebsocketResult result;
+        switch (typeEnums) {
+            case PRODUCT_EVENT_NOTICE:
+            case PRODUCT_SERVICE_REPLY:
+                RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(), executeReq.getEvent());
+                if (routeService == null) {
+                   // WebSocketUtils.sendMessage(session, WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC));
+                    return;
+                }
+                websocketMsgService.saveMsg(executeReq, payload);
+                result = routeService.execute(executeReq);
+                break;
+            default:
+                result = WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC);
+                break;
+        }
+        final boolean sendFlag = WebSocketUtils.sendMessage(session, result);
+        if (sendFlag) {
+            websocketMsgService.sendSuccessMsg(result, executeReq.getId());
+        } else {
+            websocketMsgService.sendFailMsg(result, executeReq.getId());
+        }
+    }
+
+
     /**
      * 处理接收到的二进制消息
      *

+ 24 - 17
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java

@@ -2,7 +2,9 @@ package com.xunmei.host.websocket.redis;
 
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
 import com.xunmei.common.redis.utils.RedisUtils;
+import com.xunmei.host.north.service.IotWebsocketMsgService;
 import com.xunmei.host.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import lombok.extern.slf4j.Slf4j;
@@ -17,8 +19,8 @@ import org.springframework.stereotype.Component;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
 
+import javax.annotation.Resource;
 import java.io.IOException;
-import java.util.Set;
 
 /**
  * @author gaoxiong
@@ -32,7 +34,8 @@ import java.util.Set;
 public class WebsocketSubscriber implements ApplicationRunner, Ordered {
 
     private static final String TOPIC_KEY_WEBSOCKET = WebsocketPublisher.TOPIC_KEY_WEBSOCKET;
-
+    @Resource
+    IotWebsocketMsgService websocketMsgService;
 
     @Override
     public void run(ApplicationArguments args) throws Exception {
@@ -43,29 +46,33 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
         clientTopic.addListener(RedisWebsocketMsg.class, new MessageListener<RedisWebsocketMsg>() {
             @Override
             public void onMessage(CharSequence channel, RedisWebsocketMsg msg) {
+                if (ObjectUtil.hasEmpty(msg.getTokens(), msg.getContent())) {
+                    return;
+                }
+                final Object content = msg.getContent();
+                if (content instanceof IotWebsocketMsg) {
+
+                }
+
                 String contentJson = JSON.toJSONString(msg.getContent());
                 log.info("host-websocket 接收到订阅消息:{}", JSON.toJSONString(msg));
-                Set<String> tokens = msg.getTokens();
-                sendMessage(tokens, contentJson);
+                sendMessage(msg.getTokens(), contentJson);
             }
         });
     }
 
-    private void sendMessage(Set<String> tokens, String message) {
-        if (ObjectUtil.isEmpty(tokens)) {
-            tokens = WebSocketSessionHolder.getSessionsAll();
-        }
-        if (ObjectUtil.isEmpty(tokens)) {
+    private void sendMessage(String token, String message) {
+        if (ObjectUtil.isEmpty(token)) {
             return;
         }
-        for (String token : tokens) {
-            WebSocketSession session = WebSocketSessionHolder.getSessions(token);
-            if (session != null && session.isOpen()) {
-                try {
-                    session.sendMessage(new TextMessage(message));
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
+        WebSocketSession session = WebSocketSessionHolder.getSessions(token);
+        if (session != null && session.isOpen()) {
+            try {
+                websocketMsgService.sendSuccessMsg(message);
+                session.sendMessage(new TextMessage(message));
+            } catch (IOException e) {
+
+                throw new RuntimeException(e);
             }
         }
     }

+ 4 - 4
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayQueueRunner.java

@@ -2,8 +2,8 @@ package com.xunmei.host.websocket.redis.delay;
 
 import cn.hutool.extra.spring.SpringUtil;
 import com.xunmei.common.core.thread.ThreadPoolConfig;
+import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.redis.enums.RedisDelayQueueEnum;
-import com.xunmei.common.redis.utils.RedisDelayedQueueUtil;
 import com.xunmei.system.api.util.LogUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -25,14 +25,14 @@ public class RedisDelayQueueRunner implements CommandLineRunner {
 
     @Override
     public void run(String... args) throws Exception {
-        Thread thread = new Thread(()->{
+        Thread thread = new Thread(() -> {
             while (!Thread.currentThread().isInterrupted()) {
                 try {
                     RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
                     for (RedisDelayQueueEnum queueEnum : queueEnums) {
-                        Object value = RedisDelayedQueueUtil.getDelayQueue(queueEnum.getCode());
+                        Object value = RedisDelayedQueueUtil.getDelayQueueData(queueEnum.getCode());
                         if (value != null) {
-                            LogUtils.WS_MSG_RETRY_LOG.info("{} 延迟队列有数据,开始处理", queueEnum.getName());
+                            LogUtils.WS_MSG_RETRY_LOG.info("{} 延迟队列有数据,开始处理,消息内容:{}", queueEnum.getName(), JacksonUtils.toJSONString(value));
                             RedisDelayQueueHandle<Object> redisDelayQueueHandle = (RedisDelayQueueHandle<Object>) SpringUtil.getBean(queueEnum.getBeanId());
                             ptask.execute(() -> {
                                 redisDelayQueueHandle.execute(value);

+ 30 - 20
soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/utils/RedisDelayedQueueUtil.java → soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayedQueueUtil.java

@@ -1,6 +1,7 @@
-package com.xunmei.common.redis.utils;
+package com.xunmei.host.websocket.redis.delay;
 
 import com.xunmei.common.core.utils.SpringUtils;
+import com.xunmei.system.api.util.LogUtils;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -25,27 +26,30 @@ public class RedisDelayedQueueUtil {
             addDelayQueue(value, seconds, TimeUnit.SECONDS, queueCode);
         }
     }
+
     public static <T> void addDelayQueue(T value, long delay, String queueCode) {
         addDelayQueue(value, delay, TimeUnit.SECONDS, queueCode);
     }
+
     /**
      * 添加延迟队列
-     * @param value 队列值
-     * @param delay 延迟时间
-     * @param timeUnit 时间单位
+     *
+     * @param value     队列值
+     * @param delay     延迟时间
+     * @param timeUnit  时间单位
      * @param queueCode 队列键
-     * @param <T> 泛型
+     * @param <T>       泛型
      */
-    public static <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode){
+    public static <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode) {
         try {
             RBlockingDeque<Object> blockingDeque = CLIENT.getBlockingDeque(queueCode);
             RDelayedQueue<Object> delayedQueue = CLIENT.getDelayedQueue(blockingDeque);
             delayedQueue.offer(value, delay, timeUnit);
-            log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
+            LogUtils.WS_MSG_RETRY_LOG.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
             //释放队列
             delayedQueue.destroy();
         } catch (Exception e) {
-            log.error("(添加延时队列失败) {}, value {}", e.getMessage(), value);
+            LogUtils.WS_MSG_RETRY_LOG.error("(添加延时队列失败) {}, value {}", e.getMessage(), value);
             throw new RuntimeException("(添加延时队列失败)");
         }
     }
@@ -53,16 +57,22 @@ public class RedisDelayedQueueUtil {
 
     /**
      * 获取延迟队列
+     *
      * @param queueCode 队列主键
-     * @param <T> 泛型
+     * @param <T>       泛型
      * @return
      * @throws InterruptedException
      */
-    public static <T> T getDelayQueue(String queueCode) throws InterruptedException {
-        RBlockingDeque<T> blockingDeque = CLIENT.getBlockingDeque(queueCode);
-        //避免消息伪丢失(应用重启未消费),官网推荐
-        CLIENT.getDelayedQueue(blockingDeque) ;
-        return (T) blockingDeque.take();
+    public static <T> T getDelayQueueData(String queueCode) throws InterruptedException {
+        try {
+            RBlockingDeque<T> blockingDeque = CLIENT.getBlockingDeque(queueCode);
+            //避免消息伪丢失(应用重启未消费),官网推荐
+            CLIENT.getDelayedQueue(blockingDeque);
+            return (T) blockingDeque.take();
+        } catch (InterruptedException e) {
+            LogUtils.WS_MSG_RETRY_LOG.error("获取延时队列失败,队列键:{}, 失败原因:{}", queueCode, e.getMessage());
+            throw new RuntimeException(e);
+        }
     }
 
 
@@ -74,13 +84,13 @@ public class RedisDelayedQueueUtil {
             RBlockingDeque<Object> blockingDeque = CLIENT.getBlockingDeque(queueCode);
             RDelayedQueue<Object> delayedQueue = CLIENT.getDelayedQueue(blockingDeque);
             boolean flag = delayedQueue.contains(o);
-            if(flag){
-                log.info("(存在延时队列对应的值) 队列键:{},队列值:{}", queueCode, o);
+            if (flag) {
+                LogUtils.WS_MSG_RETRY_LOG.info("(存在延时队列对应的值) 队列键:{},队列值:{}", queueCode, o);
             }
             delayedQueue.destroy();
             return flag;
         } catch (Exception e) {
-            log.error("(判断存在延时队列异常) 队列键:{},队列值:{},错误信息:{}",queueCode, o, e.getMessage());
+            LogUtils.WS_MSG_RETRY_LOG.error("(判断存在延时队列异常) 队列键:{},队列值:{},错误信息:{}", queueCode, o, e.getMessage());
             throw new RuntimeException("(判断存在延时队列异常)");
         }
 
@@ -97,13 +107,13 @@ public class RedisDelayedQueueUtil {
             if (delayedQueue.contains(o)) {
                 flag = delayedQueue.remove(o);
             }
-            if(flag){
-                log.info("(删除延时队列保证唯一性) 队列键:{},队列值:{}", queueCode, o);
+            if (flag) {
+                LogUtils.WS_MSG_RETRY_LOG.info("(删除延时队列保证唯一性) 队列键:{},队列值:{}", queueCode, o);
             }
             delayedQueue.destroy();
             return flag;
         } catch (Exception e) {
-            log.error("(删除延时队列异常) 队列键:{},队列值:{},错误信息:{}",queueCode, o, e.getMessage());
+            LogUtils.WS_MSG_RETRY_LOG.error("(删除延时队列异常) 队列键:{},队列值:{},错误信息:{}", queueCode, o, e.getMessage());
             throw new RuntimeException("(删除延时队列异常)");
         }
 

+ 1 - 3
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/WebsocketService.java

@@ -1,10 +1,8 @@
 package com.xunmei.host.websocket.service;
 
 
-import java.util.Set;
-
 public interface WebsocketService {
-    void sendMsgByTokens(Object obj, Set<String> tokens);
+    void sendMsgByTokens(Object obj, String tokens);
 
     void sendMgsToAll(Object obj);
 

+ 4 - 4
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java

@@ -38,11 +38,11 @@ public class WebsocketServiceImpl implements WebsocketService {
      * 给指定iot服务发送消息
      *
      * @param obj
-     * @param tokens
+     * @param token
      */
-    public void sendMsgByTokens(Object obj, Set<String> tokens) {
+    public void sendMsgByTokens(Object obj, String token) {
         RedisWebsocketMsg msg = new RedisWebsocketMsg();
-        msg.setTokens(tokens);
+        msg.setTokens(token);
         msg.setContent(obj);
         websocketPublisher.sendMessage(msg);
     }
@@ -63,8 +63,8 @@ public class WebsocketServiceImpl implements WebsocketService {
             String keyToken = (String) kts.next();
             String token = (String) RedisUtils.getCacheObject(keyToken);
             tokens.add(token);
+            sendMsgByTokens(obj,token);
         }
-        sendMsgByTokens(obj, tokens);
     }
 
 

+ 15 - 7
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/utils/WebSocketUtils.java

@@ -133,7 +133,7 @@ public class WebSocketUtils {
     public static void sendAll(String message) {
         if (ObjectUtil.isEmpty(message)) {
             LogUtils.WEBSOCKET_MSG.error("消息广播失败,消息内容为空");
-            return ;
+            return;
         }
         final Map<String, WebSocketSession> USER_SESSION_MAP = WebSocketSessionHolder.map();
         if (ObjectUtil.isEmpty(USER_SESSION_MAP)) {
@@ -174,13 +174,21 @@ public class WebSocketUtils {
 
     public static boolean sendMessage(WebSocketSession webSocketSession, WebsocketResult message) {
         try {
-            if (webSocketSession == null || !webSocketSession.isOpen() || ObjectUtil.isEmpty(message)) {
-                return false;
+            if (webSocketSession != null && webSocketSession.isOpen() && ObjectUtil.isNotEmpty(message)) {
+                final String msg = JacksonUtils.toJSONString(message);
+
+
+                webSocketSession.sendMessage(new PongMessage());
+
+
+                webSocketSession.sendMessage(new TextMessage(msg));
+
+
+
+                LogUtils.WEBSOCKET_MSG.info("消息发送成功,发送目标ip:{},消息内容:{}", getIp(webSocketSession), msg);
+                return true;
             }
-            final String msg = JacksonUtils.toJSONString(message);
-            webSocketSession.sendMessage(new TextMessage(msg));
-            LogUtils.WEBSOCKET_MSG.info("消息发送成功,发送目标ip:{},消息内容:{}", getIp(webSocketSession), msg);
-            return true;
+            return false;
         } catch (IOException e) {
             LogUtils.WEBSOCKET_MSG.error("消息发送失败,ip:{},异常内容:{}", getIp(webSocketSession), e);
             return false;

+ 10 - 15
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/controller/WebsocketController.java

@@ -1,23 +1,20 @@
 package com.xunmei.mediator.websocket.controller;
 
-import com.xunmei.common.core.domain.R;
 import com.xunmei.common.core.web.domain.AjaxResult;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.redis.WebsocketPublisher;
-import com.xunmei.system.api.domain.SysUser;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
-import org.aspectj.weaver.loadtime.Aj;
 import org.redisson.api.RKeys;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -49,13 +46,12 @@ public class WebsocketController {
         RedissonClient client = RedisUtils.getClient();
         RKeys keys = client.getKeys();
         Iterable<String> keyTokens = keys.getKeysByPattern(WebSocketSessionHolder.REDIS_TOPIC_WEBSOCKET_TOKEN + "*");
-        Set<String> tokens = new HashSet<>();
         for (String keyToken : keyTokens) {
             String token = RedisUtils.getCacheObject(keyToken);
-            tokens.add(token);
+            msg.setTokens(token);
+            websocketPublisher.sendMessage(msg);
         }
-        msg.setTokens(tokens);
-        websocketPublisher.sendMessage(msg);
+
         return "发送成功";
     }
 
@@ -66,18 +62,17 @@ public class WebsocketController {
      */
     @PostMapping("/sendAllMsg")
     public AjaxResult sendAllMessage(Object obj){
-        RedisWebsocketMsg msg = new RedisWebsocketMsg();
-        msg.setContent(msg);
+
         RedissonClient client = RedisUtils.getClient();
         RKeys keys = client.getKeys();
         Iterable<String> keyTokens = keys.getKeysByPattern(WebSocketSessionHolder.REDIS_TOPIC_WEBSOCKET_TOKEN + "*");
-        Set<String> tokens = new HashSet<>();
         for (String keyToken : keyTokens) {
             String token = RedisUtils.getCacheObject(keyToken);
-            tokens.add(token);
+            RedisWebsocketMsg msg = new RedisWebsocketMsg();
+            msg.setContent(msg);
+            msg.setTokens(token);
+            websocketPublisher.sendMessage(msg);
         }
-        msg.setTokens(tokens);
-        websocketPublisher.sendMessage(msg);
         return AjaxResult.success();
     }
 

+ 7 - 13
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/redis/WebsocketSubscriber.java

@@ -15,12 +15,8 @@ import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.Ordered;
 import org.springframework.stereotype.Component;
-import org.springframework.web.socket.WebSocketHandler;
 import org.springframework.web.socket.WebSocketSession;
 
-import java.util.List;
-import java.util.Set;
-
 /**
  * @author gaoxiong
  * @Title: 订阅者
@@ -45,16 +41,14 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
             @Override
             public void onMessage(CharSequence channel, RedisWebsocketMsg msg) {
                 String str = JSON.toJSONString(msg.getContent());
-                log.info("接收到订阅消息:{}",JSON.toJSONString(msg));
-                Set<String> tokens = msg.getTokens();
-                if(ObjectUtil.isNotEmpty(tokens)){
-                    for (String token : tokens) {
-                        WebSocketSession sessions = WebSocketSessionHolder.getSessions(token);
-                        if(sessions != null){
-                            WebSocketUtils.sendMessage(sessions,str);
-                        }
-
+                log.info("接收到订阅消息:{}", JSON.toJSONString(msg));
+                String token = msg.getTokens();
+                if (ObjectUtil.isNotEmpty(token)) {
+                    WebSocketSession sessions = WebSocketSessionHolder.getSessions(token);
+                    if (sessions != null) {
+                        WebSocketUtils.sendMessage(sessions, str);
                     }
+
                 }
 
             }

+ 1 - 2
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/WebsocketService.java

@@ -6,10 +6,9 @@ import com.xunmei.mediator.websocket.dto.alarmHost.AlarmHostBaseInfo;
 import com.xunmei.mediator.websocket.dto.dvs.DvsBaseInfo;
 
 import java.util.List;
-import java.util.Set;
 
 public interface WebsocketService {
-    void sendListMessage(Object obj, Set<String> tokens) throws Exception;
+    void sendListMessage(Object obj, String tokens) throws Exception;
 
     void sendAllMessage(Object obj) throws Exception;
 

+ 2 - 4
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/impl/WebsocketServiceImpl.java

@@ -120,7 +120,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
      * @param obj
      * @param tokens
      */
-    public void sendListMessage(Object obj, Set<String> tokens) {
+    public void sendListMessage(Object obj, String tokens) {
         RedisWebsocketMsg msg = new RedisWebsocketMsg();
         msg.setTokens(tokens);
         msg.setContent(obj);
@@ -137,14 +137,12 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         RedissonClient client = RedisUtils.getClient();
         RKeys keys = client.getKeys();
         Iterable<String> keyTokens = keys.getKeysByPattern("websocket:token_*");
-        Set<String> tokens = new HashSet();
         Iterator kts = keyTokens.iterator();
         while (kts.hasNext()) {
             String keyToken = (String) kts.next();
             String token = (String) RedisUtils.getCacheObject(keyToken);
-            tokens.add(token);
+            sendListMessage(obj, token);
         }
-        sendListMessage(obj, tokens);
     }