Explorar o código

消息重试回复代码部分提交

jingyuanchao hai 1 ano
pai
achega
103a2deebb

+ 11 - 0
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/constant/RedisConstant.java

@@ -9,4 +9,15 @@ public class RedisConstant {
 
     // host模块主机发送失败消息重试队列名称
     public static final String WS_RETRY_MSG_QUEUE_KEY = "ws_retry_msg_queue";
+
+    // host模块消息订阅队列名称
+    public static final String TOPIC_KEY_WEBSOCKET = "websocket_topic_key";
+
+    // host模块主机发送失败消息重试队列订阅者数量后缀
+    public static final String TOPIC_SUBSCRIBERS_NUMS = TOPIC_KEY_WEBSOCKET + "_Subscribers_";
+
+    // redis 订阅者因链接不在当前实例而发送失败后 增加发送失败数量
+    public static final String ADD_SUBSCRIBERS_AND_FAIL_NUMS = "add_SubscribersAndFailNums";
+
+
 }

+ 1 - 1
soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/enums/RedisDelayQueueEnum.java

@@ -13,7 +13,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor
 public enum RedisDelayQueueEnum {
 
-    WEBSOCKET_MSG_RETRY("WEBSOCKET_MSG_RETRY","websocket消息重试队列", "IotWebsocketMsgService"),
+    WEBSOCKET_MSG_RETRY("WEBSOCKET_MSG_RETRY","websocket消息重试队列", "iotWebsocketMsgServiceImpl"),
 
     ;
 

+ 35 - 13
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java

@@ -15,10 +15,13 @@ import com.xunmei.host.websocket.redis.delay.RedisDelayQueueHandle;
 import com.xunmei.host.websocket.redis.delay.RedisDelayedQueueUtil;
 import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.system.api.util.LogUtils;
+import lombok.SneakyThrows;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
 
 import javax.annotation.Resource;
 import java.time.LocalDateTime;
@@ -41,7 +44,7 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public void saveMsg(WebsocketExecuteReq req, String receiveMsg) {
-
+        //刚刚收到消息,还没开始处理业务逻辑,先保存消息数据
         final IotServerInfo serverInfo = req.getServerInfo();
         final IotWebsocketMsg msg = new IotWebsocketMsg();
         msg.setId(req.getId());
@@ -54,7 +57,7 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
         msg.setReceiveContent(receiveMsg);
         msg.setMaxRetryTimes(3);
         msg.setCurRetryTimes(0);
-        msg.setRetryInterval(3);
+        msg.setRetryInterval(1);
         msg.setCreateTime(LocalDateTime.now());
         msg.setUpdateTime(LocalDateTime.now());
         save(msg);
@@ -65,12 +68,13 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public void sendSuccessMsg(String msgId) {
+        //业务处理完成,回复成功了
         final IotWebsocketMsg websocketMsg = getById(msgId);
         if (websocketMsg == null) {
             return;
         }
         websocketMsg.setStatus(MessageStatusEnum.SUCCESS.getCode());
-        if (websocketMsg.getReadySendTime()==null){
+        if (websocketMsg.getReadySendTime() == null) {
             websocketMsg.setReadySendTime(LocalDateTime.now());
         }
         websocketMsg.setReallySendTime(LocalDateTime.now());
@@ -83,13 +87,16 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public void sendSuccessMsg(WebsocketResult res, String msgId) {
+        //业务处理完成,回复成功了
         final IotWebsocketMsg websocketMsg = getById(msgId);
         if (websocketMsg == null) {
             return;
         }
         websocketMsg.setStatus(MessageStatusEnum.SUCCESS.getCode());
         websocketMsg.setReplyContent(JacksonUtils.toJSONString(res));
-        websocketMsg.setReadySendTime(LocalDateTime.now());
+        if (websocketMsg.getReadySendTime()==null){
+            websocketMsg.setReadySendTime(LocalDateTime.now());
+        }
         websocketMsg.setReallySendTime(LocalDateTime.now());
         websocketMsg.setUpdateTime(LocalDateTime.now());
         updateById(websocketMsg);
@@ -100,22 +107,29 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public void sendFailMsg(WebsocketResult res, String msgId) {
-        //收到消息处理完成后第一次发送,但是发送失败了
+
         final IotWebsocketMsg websocketMsg = getById(msgId);
         if (websocketMsg == null) {
             return;
         }
         final LocalDateTime now = LocalDateTime.now();
-        websocketMsg.setReplyContent(JacksonUtils.toJSONString(res));
-        websocketMsg.setStatus(MessageStatusEnum.RETRYING.getCode());
-        websocketMsg.setReadySendTime(now);
+        //final LocalDateTime firTime = websocketMsg.getFireTime() == null ? now : websocketMsg.getFireTime();
+        final Integer curRetryTimes = websocketMsg.getCurRetryTimes();
+        if (curRetryTimes == 0) {
+            //收到消息处理完成后第一次发送,但是发送失败了
+            websocketMsg.setReadySendTime(now);
+            websocketMsg.setReplyContent(JacksonUtils.toJSONString(res));
+            websocketMsg.setStatus(MessageStatusEnum.RETRYING.getCode());
+        }
         websocketMsg.setFireTime(now.plusMinutes(websocketMsg.getRetryInterval()));
+        //websocketMsg.setFireTime((firTime).plusSeconds(60));
         websocketMsg.setUpdateTime(now);
         updateById(websocketMsg);
         RedisDelayedQueueUtil.addDelayQueue(msgId, websocketMsg.getFireTime(), RedisDelayQueueEnum.WEBSOCKET_MSG_RETRY.getCode());
         LogUtils.WS_MSG_RETRY_LOG.info("主机消息回复失败,已添加延迟队列,等待重试,消息内容: {}", JacksonUtils.toJSONString(websocketMsg));
     }
 
+    @SneakyThrows
     @Override
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public void execute(Object o) {
@@ -127,14 +141,22 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
         }
         final Integer maxRetryTimes = msg.getMaxRetryTimes();
         final Integer curRetryTimes = msg.getCurRetryTimes();
+
         if (curRetryTimes < maxRetryTimes) {
-            websocketService.sendMsgByTokens(msg, msg.getIotCode());
+            //第一次重试, 把消息放到队列中
             msg.setCurRetryTimes(curRetryTimes + 1);
             updateById(msg);
-            LogUtils.WS_MSG_RETRY_LOG.error("开始第{}次重试Ws消息,msgId: {}", msg.getCurRetryTimes(), msgId);
-            return;
+            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
+                @Override
+                public void afterCommit() {
+                    websocketService.sendMsgByTokens(msg, msg.getIotCode());
+                }
+            });
+            LogUtils.WS_MSG_RETRY_LOG.info("开始第{}次重试Ws消息,msgId: {}", msg.getCurRetryTimes(), msgId);
+        } else {
+            msg.setStatus(MessageStatusEnum.RETRY_FAIL.getCode());
+            updateById(msg);
+            LogUtils.WS_MSG_RETRY_LOG.info("重试次数已达到最大值,停止重试,msgId: {}", msgId);
         }
-
-
     }
 }

+ 2 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java

@@ -113,7 +113,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
             map.put("statusCode", WebsocketStatus.SUCCESS.getStatusCode());
             WebsocketResult result = createWebsocketResult(null, "login", map);
             WebSocketUtils.sendMessage(session, JacksonUtils.toJSONString(result));
-            LogUtils.WEBSOCKET_MSG.info("[建立连接],token:{}", token);
+            LogUtils.WEBSOCKET_MSG.info("[成功与{}建立WS连接],token:{}", ip, token);
             return;
         }
         LogUtils.WEBSOCKET_MSG.info("[建立连接失败],token:{}", token);
@@ -213,7 +213,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
             case PRODUCT_SERVICE_REPLY:
                 RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(), executeReq.getEvent());
                 if (routeService == null) {
-                   // WebSocketUtils.sendMessage(session, WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC));
+                    // WebSocketUtils.sendMessage(session, WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC));
                     return;
                 }
                 websocketMsgService.saveMsg(executeReq, payload);

+ 18 - 7
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketPublisher.java

@@ -1,6 +1,9 @@
 package com.xunmei.host.websocket.redis;
 
+import com.xunmei.common.core.constant.RedisConstant;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
 import com.xunmei.common.redis.utils.RedisUtils;
+import com.xunmei.host.websocket.redis.vo.SubscribersAndFailNums;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RTopic;
@@ -10,6 +13,7 @@ import org.springframework.stereotype.Component;
 
 /**
  * websocket 消息发布者
+ *
  * @author gaoxiong
  * @Title:
  * @Package
@@ -20,20 +24,27 @@ import org.springframework.stereotype.Component;
 @Component
 public class WebsocketPublisher {
 
-    public static final String TOPIC_KEY_WEBSOCKET = "websocket_topic_key";
 
     public void sendMessage(RedisWebsocketMsg message) {
-        try{
+        try {
             RedissonClient client = RedisUtils.getClient();
             //订阅的主题
-            RTopic clientTopic = client.getTopic(TOPIC_KEY_WEBSOCKET, new SerializationCodec());
+            RTopic clientTopic = client.getTopic(RedisConstant.TOPIC_KEY_WEBSOCKET, new SerializationCodec());
             //消息发布
             clientTopic.publishAsync(message);
-            log.info("websocket 发布消息:{}",message);
-            log.info("发送消息,存在订阅者数量:{}",clientTopic.countSubscribers());
-        }catch (Exception e){
-            e.printStackTrace();
+            final long subscribers = clientTopic.countSubscribers();
+            log.info("websocket topic:{},存在订阅者数量:{}, 发布消息:{},", RedisConstant.TOPIC_KEY_WEBSOCKET, subscribers, message);
+            saveSubscribers(message, (int) subscribers);
+        } catch (Exception e) {
+            log.error("websocket 发布消息失败:{}", message, e);
         }
 
     }
+
+    private void saveSubscribers(RedisWebsocketMsg message, int subscribers) {
+        if (message.getContent() instanceof IotWebsocketMsg) {
+            IotWebsocketMsg iotWebsocketMsg = (IotWebsocketMsg) message.getContent();
+            RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + iotWebsocketMsg.getId(), new SubscribersAndFailNums(subscribers, 0));
+        }
+    }
 }

+ 19 - 10
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java

@@ -2,10 +2,13 @@ package com.xunmei.host.websocket.redis;
 
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
+import com.xunmei.common.core.constant.RedisConstant;
 import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.host.north.service.IotWebsocketMsgService;
+import com.xunmei.host.websocket.dto.WebsocketResult;
 import com.xunmei.host.websocket.holder.WebSocketSessionHolder;
+import com.xunmei.host.websocket.redis.vo.SubscribersAndFailNums;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RTopic;
@@ -16,6 +19,7 @@ import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.Ordered;
 import org.springframework.stereotype.Component;
+import org.springframework.web.socket.PingMessage;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
 
@@ -33,7 +37,6 @@ import java.io.IOException;
 @Component
 public class WebsocketSubscriber implements ApplicationRunner, Ordered {
 
-    private static final String TOPIC_KEY_WEBSOCKET = WebsocketPublisher.TOPIC_KEY_WEBSOCKET;
     @Resource
     IotWebsocketMsgService websocketMsgService;
 
@@ -41,7 +44,7 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
     public void run(ApplicationArguments args) throws Exception {
         log.info("WebsocketSubscriber 开始运行");
         RedissonClient client = RedisUtils.getClient();
-        RTopic clientTopic = client.getTopic(TOPIC_KEY_WEBSOCKET, new SerializationCodec());
+        RTopic clientTopic = client.getTopic(RedisConstant.TOPIC_KEY_WEBSOCKET, new SerializationCodec());
 
         clientTopic.addListener(RedisWebsocketMsg.class, new MessageListener<RedisWebsocketMsg>() {
             @Override
@@ -49,11 +52,6 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
                 if (ObjectUtil.hasEmpty(msg.getTokens(), msg.getContent())) {
                     return;
                 }
-                final Object content = msg.getContent();
-                if (content instanceof IotWebsocketMsg) {
-
-                }
-
                 String contentJson = JSON.toJSONString(msg.getContent());
                 log.info("host-websocket 接收到订阅消息:{}", JSON.toJSONString(msg));
                 sendMessage(msg.getTokens(), contentJson);
@@ -65,15 +63,26 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
         if (ObjectUtil.isEmpty(token)) {
             return;
         }
+
+        IotWebsocketMsg msg = JSON.parseObject(message, IotWebsocketMsg.class);
         WebSocketSession session = WebSocketSessionHolder.getSessions(token);
         if (session != null && session.isOpen()) {
             try {
-                websocketMsgService.sendSuccessMsg(message);
-                session.sendMessage(new TextMessage(message));
+                session.sendMessage(new PingMessage());
+                session.sendMessage(new TextMessage(msg.getReplyContent()));
+                websocketMsgService.sendSuccessMsg(msg.getId());
             } catch (IOException e) {
-
+                // todo 消息重试
                 throw new RuntimeException(e);
             }
+        } else {
+            SubscribersAndFailNums failNums = RedisUtils.getCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msg.getId());
+            SubscribersAndFailNums.addFailNum(failNums,msg.getId());
+            final WebsocketResult result = JSON.parseObject(msg.getReplyContent(), WebsocketResult.class);
+            if (failNums.getSubscribers() <= failNums.getFailNums()) {
+                //说明该消息对应的主机已经离线,所有实例都没有该主机的ws链接信息,保存发送失败消息
+                websocketMsgService.sendFailMsg(result, msg.getId());
+            }
         }
     }
 

+ 46 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/vo/SubscribersAndFailNums.java

@@ -0,0 +1,46 @@
+package com.xunmei.host.websocket.redis.vo;
+
+import com.xunmei.common.core.constant.RedisConstant;
+import com.xunmei.common.redis.utils.RedisUtils;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author jingyuanchao
+ * @date 2024/10/31 14:13
+ */
+@Slf4j
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class SubscribersAndFailNums {
+
+    private Integer subscribers;
+
+    private Integer failNums;
+
+
+    public static void addFailNum(SubscribersAndFailNums failNums, String msgId) {
+        final RedissonClient client = RedisUtils.getClient();
+        final RLock lock = client.getLock(RedisConstant.ADD_SUBSCRIBERS_AND_FAIL_NUMS);
+        try {
+            final boolean tryLock = lock.tryLock(10, 10, TimeUnit.SECONDS);
+            if (tryLock) {
+                failNums.setFailNums(failNums.getFailNums() + 1);
+                RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msgId, failNums);
+            }
+        } catch (InterruptedException e) {
+            log.error("增加失败数时,未能获取到锁", e);
+            throw new RuntimeException(e);
+        } finally {
+            if (lock.isHeldByCurrentThread())
+                lock.unlock();
+        }
+    }
+}

+ 2 - 5
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/utils/WebSocketUtils.java

@@ -12,10 +12,7 @@ import io.netty.util.internal.StringUtil;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.socket.PongMessage;
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketMessage;
-import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.*;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -178,7 +175,7 @@ public class WebSocketUtils {
                 final String msg = JacksonUtils.toJSONString(message);
 
 
-                webSocketSession.sendMessage(new PongMessage());
+                webSocketSession.sendMessage(new PingMessage());
 
 
                 webSocketSession.sendMessage(new TextMessage(msg));