Pārlūkot izejas kodu

消息重试回复代码部分提交

jingyuanchao 1 gadu atpakaļ
vecāks
revīzija
8477b004b5

+ 3 - 3
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/domain/iot/domain/IotWebsocketMsg.java

@@ -13,7 +13,7 @@ import java.time.LocalDateTime;
 
 /**
  * <p>
- * 
+ *
  * </p>
  *
  * @author jingYuanChao
@@ -22,7 +22,7 @@ import java.time.LocalDateTime;
 @Data
 @EqualsAndHashCode(callSuper = false)
 @TableName("iot_websocket_msg")
-@ApiModel(value="IotWebsocketMsg对象", description="")
+@ApiModel(value = "IotWebsocketMsg对象", description = "")
 public class IotWebsocketMsg implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -53,7 +53,7 @@ public class IotWebsocketMsg implements Serializable {
     /**
      * @see com.xunmei.common.core.enums.iot.MessageStatusEnum
      */
-    @ApiModelProperty(value = "消息状态;0:处理中,1:处理完成,2:重试中,3:重试成功,4:重试失败")
+    @ApiModelProperty(value = "消息状态;0:处理中,1:处理完成,2:重试中,3:重试失败")
     @TableField("status")
     private Integer status;
 

+ 3 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/alarm/service/impl/IotAlarmRuleServiceImpl.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmRule;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmRuleExpress;
 import com.xunmei.common.core.domain.mediator.domain.IotAlarmRuleSource;
@@ -80,8 +81,8 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new com.alibaba.fastjson.JSONObject(), WebSocketConstants.ALARM_RULE, jsb);
 
 
-        websocketMsgService.sendAndSaveMsg(iotAlarmRule.getIotCode(), result);
-        websocketService.sendMsgByTokens(result, iotAlarmRule.getIotCode());
+        final IotWebsocketMsg saveMsg = websocketMsgService.sendAndSaveMsg(iotAlarmRule.getIotCode(), result);
+        websocketService.sendMsgByTokens(saveMsg, iotAlarmRule.getIotCode());
     }
 
     private JSONObject receiveAndUpdateAlarmRule(WebsocketExecuteReq req) {

+ 1 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/IotWebsocketMsgService.java

@@ -26,7 +26,7 @@ public interface IotWebsocketMsgService extends IService<IotWebsocketMsg> {
      * @param req
      * @param res
      */
-    void sendAndSaveMsg(String iotCode, WebsocketResult res);
+    IotWebsocketMsg sendAndSaveMsg(String iotCode, WebsocketResult res);
     void sendSuccessMsg(WebsocketResult res, String msgId);
     void sendSuccessMsg(String msgId);
     void sendFailMsg(WebsocketResult res, String msgId);

+ 16 - 8
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java

@@ -18,6 +18,7 @@ import com.xunmei.host.websocket.redis.delay.RedisDelayedQueueUtil;
 import com.xunmei.host.websocket.service.WebsocketService;
 import com.xunmei.system.api.util.LogUtils;
 import lombok.SneakyThrows;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
@@ -41,11 +42,17 @@ import java.util.List;
 public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMapper, IotWebsocketMsg> implements IotWebsocketMsgService, RedisDelayQueueHandle {
 
     @Resource
+    @Lazy
     WebsocketService websocketService;
 
     @Resource
     IotServerInfoService serverInfoService;
 
+
+    public static final Integer maxRetryTimes = 3;
+
+    public static final Integer retryInterval = 1;
+
     @Override
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@@ -61,9 +68,9 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
         msg.setServerName(serverInfo.getIotName());
         msg.setStatus(MessageStatusEnum.PROCESSING.getCode());
         msg.setReceiveContent(receiveMsg);
-        msg.setMaxRetryTimes(3);
+        msg.setMaxRetryTimes(maxRetryTimes);
         msg.setCurRetryTimes(0);
-        msg.setRetryInterval(10);
+        msg.setRetryInterval(retryInterval);
         msg.setCreateTime(LocalDateTime.now());
         msg.setUpdateTime(LocalDateTime.now());
         save(msg);
@@ -71,7 +78,7 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     }
 
     @Override
-    public void sendAndSaveMsg(String iotCode, WebsocketResult res) {
+    public IotWebsocketMsg sendAndSaveMsg(String iotCode, WebsocketResult res) {
         //刚刚收到消息,还没开始处理业务逻辑,先保存消息数据
 
         final IotServerInfo serverInfo = serverInfoService.selectByToken(iotCode);
@@ -86,13 +93,14 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
         msg.setStatus(MessageStatusEnum.PROCESSING.getCode());
         msg.setReplyContent(JacksonUtils.toJSONString(res));
         msg.setReadySendTime(now);
-        msg.setMaxRetryTimes(3);
+        msg.setMaxRetryTimes(maxRetryTimes);
         msg.setCurRetryTimes(0);
-        msg.setRetryInterval(10);
+        msg.setRetryInterval(retryInterval);
         msg.setCreateTime(now);
         msg.setUpdateTime(now);
         save(msg);
         LogUtils.WS_MSG_RETRY_LOG.info("开始下发消息到主机,msg: {}", JacksonUtils.toJSONString(msg));
+        return msg;
     }
 
     @Override
@@ -147,7 +155,7 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
             return;
         }
         final LocalDateTime now = LocalDateTime.now();
-        //final LocalDateTime firTime = websocketMsg.getFireTime() == null ? now : websocketMsg.getFireTime();
+        final LocalDateTime firTime = websocketMsg.getFireTime() == null ? now : websocketMsg.getFireTime();
         final Integer curRetryTimes = websocketMsg.getCurRetryTimes();
         if (curRetryTimes == 0) {
             //收到消息处理完成后第一次发送,但是发送失败了
@@ -155,8 +163,8 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
             websocketMsg.setReplyContent(JacksonUtils.toJSONString(res));
             websocketMsg.setStatus(MessageStatusEnum.RETRYING.getCode());
         }
-        websocketMsg.setFireTime(now.plusMinutes(websocketMsg.getRetryInterval()));
-        //websocketMsg.setFireTime((firTime).plusSeconds(60));
+        //websocketMsg.setFireTime(now.plusMinutes(websocketMsg.getRetryInterval()));
+        websocketMsg.setFireTime((firTime).plusSeconds(30));
         websocketMsg.setUpdateTime(now);
         updateById(websocketMsg);
         RedisDelayedQueueUtil.addDelayQueue(msgId, websocketMsg.getFireTime(), RedisDelayQueueEnum.WEBSOCKET_MSG_RETRY.getCode());

+ 1 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketPublisher.java

@@ -44,7 +44,7 @@ public class WebsocketPublisher {
     private void saveSubscribers(RedisWebsocketMsg message, int subscribers) {
         if (message.getContent() instanceof IotWebsocketMsg) {
             IotWebsocketMsg iotWebsocketMsg = (IotWebsocketMsg) message.getContent();
-            RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + iotWebsocketMsg.getId(), new SubscribersAndFailNums(subscribers, 0));
+            RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + iotWebsocketMsg.getId(), new SubscribersAndFailNums(subscribers, 0),1000 * 60 * 60 * 24);
         }
     }
 }

+ 1 - 0
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java

@@ -89,6 +89,7 @@ public class WebsocketSubscriber implements ApplicationRunner {
             try {
                 session.sendMessage(new PingMessage());
                 session.sendMessage(new TextMessage(JSON.toJSONString(result)));
+                websocketMsgService.sendSuccessMsg(result.getId());
             } catch (Exception e) {
                 log.error("发送消息失败", e);
             }

+ 1 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/vo/SubscribersAndFailNums.java

@@ -33,7 +33,7 @@ public class SubscribersAndFailNums {
             final boolean tryLock = lock.tryLock(10, 10, TimeUnit.SECONDS);
             if (tryLock) {
                 failNums.setFailNums(failNums.getFailNums() + 1);
-                RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msgId, failNums);
+                RedisUtils.setCacheObject(RedisConstant.TOPIC_SUBSCRIBERS_NUMS + msgId, failNums, 1000 * 60 * 60 * 24);
             }
         } catch (InterruptedException e) {
             log.error("增加失败数时,未能获取到锁", e);

+ 28 - 3
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java

@@ -1,7 +1,9 @@
 package com.xunmei.host.websocket.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
 import com.xunmei.common.redis.utils.RedisUtils;
+import com.xunmei.host.north.service.IotWebsocketMsgService;
 import com.xunmei.host.websocket.constant.WebSocketConstants;
 import com.xunmei.host.websocket.dto.WebsocketResult;
 import com.xunmei.host.websocket.enums.TopicTypeEnums;
@@ -14,8 +16,10 @@ import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
@@ -25,12 +29,22 @@ public class WebsocketServiceImpl implements WebsocketService {
     private static final Logger log = LoggerFactory.getLogger(WebsocketServiceImpl.class);
     @Autowired
     private WebsocketPublisher websocketPublisher;
+    @Resource
+    @Lazy
+    private IotWebsocketMsgService websocketMsgService;
 
     @Override
     public void getDevices() {
-        final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(),WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
+        final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new JSONObject(), WebSocketConstants.GET_DEVICES_SERVICES, new JSONObject());
-        sendMgsToAll(result);
+        final Set<String> set = getAllIotCode();
+        if (set.isEmpty()) {
+            return;
+        }
+        for (String token : set) {
+            final IotWebsocketMsg iotWebsocketMsg = websocketMsgService.sendAndSaveMsg(token, result);
+            sendMsgByTokens(iotWebsocketMsg, token);
+        }
     }
 
 
@@ -54,6 +68,16 @@ public class WebsocketServiceImpl implements WebsocketService {
      * @throws Exception
      */
     public void sendMgsToAll(Object obj) {
+        final Set<String> codeList = getAllIotCode();
+        if (codeList.isEmpty()) {
+            return;
+        }
+        for (String token : codeList) {
+            sendMsgByTokens(obj, token);
+        }
+    }
+
+    private static Set<String> getAllIotCode() {
         RedissonClient client = RedisUtils.getClient();
         RKeys keys = client.getKeys();
         Iterable<String> keyTokens = keys.getKeysByPattern(WebSocketSessionHolder.REDIS_TOPIC_WEBSOCKET_TOKEN + "*");
@@ -63,8 +87,9 @@ public class WebsocketServiceImpl implements WebsocketService {
             String keyToken = (String) kts.next();
             String token = (String) RedisUtils.getCacheObject(keyToken);
             tokens.add(token);
-            sendMsgByTokens(obj,token);
         }
+        //tokens.add("d690deb4-5bf8-4d63-b7a7-dd698559910c");
+        return tokens;
     }