Jelajahi Sumber

消息重试回复代码部分提交

jingyuanchao 1 tahun lalu
induk
melakukan
bfdef01cac

+ 14 - 8
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java

@@ -149,22 +149,30 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public void sendFailMsg(WebsocketResult res, String msgId) {
-
         final IotWebsocketMsg websocketMsg = getById(msgId);
         if (websocketMsg == null) {
             return;
         }
         final LocalDateTime now = LocalDateTime.now();
-        final LocalDateTime firTime = websocketMsg.getFireTime() == null ? now : websocketMsg.getFireTime();
+        //final LocalDateTime firTime = websocketMsg.getFireTime() == null ? now : websocketMsg.getFireTime();
         final Integer curRetryTimes = websocketMsg.getCurRetryTimes();
         if (curRetryTimes == 0) {
-            //收到消息处理完成后第一次发送,但是发送失败了
+            //走到这里说明收到了主机发送的消息,且回复消息失败了,开始重试消息
             websocketMsg.setReadySendTime(now);
             websocketMsg.setReplyContent(JacksonUtils.toJSONString(res));
             websocketMsg.setStatus(MessageStatusEnum.RETRYING.getCode());
         }
-        //websocketMsg.setFireTime(now.plusMinutes(websocketMsg.getRetryInterval()));
-        websocketMsg.setFireTime((firTime).plusSeconds(30));
+        if (websocketMsg.getCurRetryTimes() >= websocketMsg.getMaxRetryTimes()) {
+            websocketMsg.setStatus(MessageStatusEnum.RETRY_FAIL.getCode());
+            websocketMsg.setUpdateTime(now);
+            updateById(websocketMsg);
+            return;
+        }
+
+        //设置重试的触发时间与重试次数
+        //websocketMsg.setFireTime((firTime).plusSeconds(30));
+        websocketMsg.setFireTime(now.plusMinutes(websocketMsg.getRetryInterval()));
+        websocketMsg.setCurRetryTimes(curRetryTimes + 1);
         websocketMsg.setUpdateTime(now);
         updateById(websocketMsg);
         RedisDelayedQueueUtil.addDelayQueue(msgId, websocketMsg.getFireTime(), RedisDelayQueueEnum.WEBSOCKET_MSG_RETRY.getCode());
@@ -184,10 +192,8 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
         final Integer maxRetryTimes = msg.getMaxRetryTimes();
         final Integer curRetryTimes = msg.getCurRetryTimes();
 
-        if (curRetryTimes < maxRetryTimes) {
+        if (curRetryTimes <= maxRetryTimes) {
             //第一次重试, 把消息放到队列中
-            msg.setCurRetryTimes(curRetryTimes + 1);
-            updateById(msg);
             TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                 @Override
                 public void afterCommit() {

+ 1 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketPublisher.java

@@ -42,6 +42,7 @@ public class WebsocketPublisher {
     }
 
     private void saveSubscribers(RedisWebsocketMsg message, int subscribers) {
+        //此处记录订阅数量与发送失败数量,是因为在重试时需要所有的监听都发送失败了 才能认为这次消息重试发送失败
         if (message.getContent() instanceof IotWebsocketMsg) {
             IotWebsocketMsg iotWebsocketMsg = (IotWebsocketMsg) message.getContent();
             RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + iotWebsocketMsg.getId(), new SubscribersAndFailNums(subscribers, 0),1000 * 60 * 60 * 24);

+ 4 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java

@@ -52,6 +52,7 @@ public class WebsocketSubscriber implements ApplicationRunner {
                 if (ObjectUtil.hasEmpty(msg.getTokens(), msg.getContent())) {
                     return;
                 }
+                //此处根据 class 来判断哪些需要重试,哪些不需要重试
                 if (msg.getContent() instanceof IotWebsocketMsg) {
                     sendMessage(msg.getTokens(), (IotWebsocketMsg) msg.getContent());
                 }
@@ -67,6 +68,7 @@ public class WebsocketSubscriber implements ApplicationRunner {
         final WebsocketResult result = JSON.parseObject(msg.getReplyContent(), WebsocketResult.class);
         if (session != null && session.isOpen()) {
             try {
+                //此处发送前ping一下时有可能走到if里面后链接断开,此时消息还能发出去,但实际上客户端是收不到这个消息的,ping一下能解决这个问题
                 session.sendMessage(new PingMessage());
                 session.sendMessage(new TextMessage(msg.getReplyContent()));
                 websocketMsgService.sendSuccessMsg(msg.getId());
@@ -74,8 +76,8 @@ public class WebsocketSubscriber implements ApplicationRunner {
                 websocketMsgService.sendFailMsg(result, msg.getId());
             }
         } else {
-            SubscribersAndFailNums failNums = RedisUtils.getCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msg.getId());
-            SubscribersAndFailNums.addFailNum(failNums, msg.getId());
+            //此处判断是否是所有的监听都发送失败了, 增加失败数量需要加锁,读写应该保持原子性
+            final SubscribersAndFailNums failNums = SubscribersAndFailNums.syncAddFailNum(msg.getId());
             if (failNums.getSubscribers() <= failNums.getFailNums()) {
                 //说明该消息对应的主机已经离线,所有实例都没有该主机的ws链接信息,保存发送失败消息
                 websocketMsgService.sendFailMsg(result, msg.getId());

+ 4 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/vo/SubscribersAndFailNums.java

@@ -26,21 +26,23 @@ public class SubscribersAndFailNums {
     private Integer failNums;
 
 
-    public static void addFailNum(SubscribersAndFailNums failNums, String msgId) {
+    public static SubscribersAndFailNums syncAddFailNum(String msgId) {
         final RedissonClient client = RedisUtils.getClient();
         final RLock lock = client.getLock(RedisConstant.ADD_SUBSCRIBERS_AND_FAIL_NUMS);
         try {
             final boolean tryLock = lock.tryLock(10, 10, TimeUnit.SECONDS);
             if (tryLock) {
+                final SubscribersAndFailNums failNums = RedisUtils.getCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msgId);
                 failNums.setFailNums(failNums.getFailNums() + 1);
                 RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msgId, failNums, 1000 * 60 * 60 * 24);
+                return failNums;
             }
         } catch (InterruptedException e) {
             log.error("增加失败数时,未能获取到锁", e);
-            throw new RuntimeException(e);
         } finally {
             if (lock.isHeldByCurrentThread())
                 lock.unlock();
         }
+        return RedisUtils.getCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msgId);
     }
 }

+ 1 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java

@@ -55,6 +55,7 @@ public class WebsocketServiceImpl implements WebsocketService {
      * @param token
      */
     public void sendMsgByTokens(Object obj, String token) {
+        //如果消息需要重试 obj=IotWebsocketMsg.class  如果不需要重试 类似上下班的操作指令 obj=WebsocketResult.class
         RedisWebsocketMsg msg = new RedisWebsocketMsg();
         msg.setTokens(token);
         msg.setContent(obj);