소스 검색

消息重试回复代码部分提交

jingyuanchao 1 년 전
부모
커밋
f3a977b640

+ 7 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/alarm/service/impl/IotAlarmRuleServiceImpl.java

@@ -12,6 +12,7 @@ import com.xunmei.host.alarm.mapper.IotAlarmRuleMapper;
 import com.xunmei.host.alarm.service.IotAlarmRuleExpressService;
 import com.xunmei.host.alarm.service.IotAlarmRuleService;
 import com.xunmei.host.alarm.service.IotAlarmRuleSourceService;
+import com.xunmei.host.north.service.IotWebsocketMsgService;
 import com.xunmei.host.websocket.constant.WebSocketConstants;
 import com.xunmei.host.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.host.websocket.dto.WebsocketResult;
@@ -38,6 +39,8 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
     private IotAlarmRuleExpressService ruleExpressService;
     @Resource
     private WebsocketService websocketService;
+    @Resource
+    private IotWebsocketMsgService websocketMsgService;
 
     @Override
     public ProductEnums product() {
@@ -55,7 +58,7 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
     }
 
     @Override
-    @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRES_NEW)
+    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public WebsocketResult execute(WebsocketExecuteReq req) {
         final JSONObject jsb = receiveAndUpdateAlarmRule(req);
         return WebsocketResult.reply(req, jsb);
@@ -75,6 +78,9 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
 
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), ProductEnums.DETECTION_HOST.getProductName().get(0), ProductEnums.DETECTION_HOST.getProductName().get(1));
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new com.alibaba.fastjson.JSONObject(), WebSocketConstants.ALARM_RULE, jsb);
+
+
+        websocketMsgService.sendAndSaveMsg(iotAlarmRule.getIotCode(), result);
         websocketService.sendMsgByTokens(result, iotAlarmRule.getIotCode());
     }
 

+ 12 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/IotWebsocketMsgService.java

@@ -15,8 +15,18 @@ import com.xunmei.host.websocket.dto.WebsocketResult;
  */
 public interface IotWebsocketMsgService extends IService<IotWebsocketMsg> {
 
-
-    void saveMsg(WebsocketExecuteReq req,String receiveMsg);
+    /**
+     * 针对主机上报的消息
+     * @param req
+     * @param receiveMsg
+     */
+    void receiveAndSaveMsg(WebsocketExecuteReq req, String receiveMsg);
+    /**
+     * 针对平台下发到主机的消息
+     * @param req
+     * @param res
+     */
+    void sendAndSaveMsg(String iotCode, WebsocketResult res);
     void sendSuccessMsg(WebsocketResult res, String msgId);
     void sendSuccessMsg(String msgId);
     void sendFailMsg(WebsocketResult res, String msgId);

+ 30 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java

@@ -10,6 +10,7 @@ import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.redis.enums.RedisDelayQueueEnum;
 import com.xunmei.host.north.mapper.IotWebsocketMsgMapper;
 import com.xunmei.host.north.service.IotWebsocketMsgService;
+import com.xunmei.host.server.service.IotServerInfoService;
 import com.xunmei.host.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.host.websocket.dto.WebsocketResult;
 import com.xunmei.host.websocket.redis.delay.RedisDelayQueueHandle;
@@ -42,10 +43,13 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     @Resource
     WebsocketService websocketService;
 
+    @Resource
+    IotServerInfoService serverInfoService;
+
     @Override
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
-    public void saveMsg(WebsocketExecuteReq req, String receiveMsg) {
+    public void receiveAndSaveMsg(WebsocketExecuteReq req, String receiveMsg) {
         //刚刚收到消息,还没开始处理业务逻辑,先保存消息数据
         final IotServerInfo serverInfo = req.getServerInfo();
         final IotWebsocketMsg msg = new IotWebsocketMsg();
@@ -67,6 +71,31 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     }
 
     @Override
+    public void sendAndSaveMsg(String iotCode, WebsocketResult res) {
+        //刚刚收到消息,还没开始处理业务逻辑,先保存消息数据
+
+        final IotServerInfo serverInfo = serverInfoService.selectByToken(iotCode);
+        final IotWebsocketMsg msg = new IotWebsocketMsg();
+        final LocalDateTime now = LocalDateTime.now();
+        msg.setId(res.getId());
+        msg.setEvent(res.getServiceName());
+        msg.setOrgId(serverInfo.getOrgId());
+        msg.setOrgName(serverInfo.getOrgName());
+        msg.setIotCode(serverInfo.getIotCode());
+        msg.setServerName(serverInfo.getIotName());
+        msg.setStatus(MessageStatusEnum.PROCESSING.getCode());
+        msg.setReplyContent(JacksonUtils.toJSONString(res));
+        msg.setReadySendTime(now);
+        msg.setMaxRetryTimes(3);
+        msg.setCurRetryTimes(0);
+        msg.setRetryInterval(10);
+        msg.setCreateTime(now);
+        msg.setUpdateTime(now);
+        save(msg);
+        LogUtils.WS_MSG_RETRY_LOG.info("开始下发消息到主机,msg: {}", JacksonUtils.toJSONString(msg));
+    }
+
+    @Override
     @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
     public void sendSuccessMsg(String msgId) {

+ 14 - 5
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/dto/WebsocketResult.java

@@ -13,7 +13,6 @@ import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.UUID;
 
 /**
@@ -104,10 +103,12 @@ public class WebsocketResult implements Serializable {
         websocketResult.setTimestamp(DateUtil.format(new Date(), Constants.UTC_FORMAT));
         websocketResult.setTopic(topic);
         websocketResult.setHeaders(headers);
-        HashMap<String, Object> hashMap = new HashMap<>();
-        hashMap.put(WebSocketConstants.SERVICE, service);
-        hashMap.put(WebSocketConstants.ARGS, args);
-        websocketResult.setPayload(hashMap);
+
+        JSONObject payload = new JSONObject();
+        payload.put(WebSocketConstants.SERVICE, service);
+        payload.put(WebSocketConstants.ARGS, args);
+
+        websocketResult.setPayload(payload);
         return websocketResult;
     }
 
@@ -122,5 +123,13 @@ public class WebsocketResult implements Serializable {
         }
         return object;
     }
+
+    public String getServiceName() {
+        final Object resultPayload = this.payload;
+        if (resultPayload instanceof JSONObject) {
+            return ((JSONObject) resultPayload).getString(WebSocketConstants.SERVICE);
+        }
+        return StringUtil.EMPTY_STRING;
+    }
 }
 

+ 1 - 2
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java

@@ -207,7 +207,6 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
 
 
     private void handleRequestBasedOnType(WebsocketExecuteReq executeReq, TopicTypeEnums typeEnums, WebSocketSession session, String payload) {
-        final String iotCode = executeReq.getServerInfo().getIotCode();
         WebsocketResult result;
         switch (typeEnums) {
             case PRODUCT_EVENT_NOTICE:
@@ -217,7 +216,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
                     // WebSocketUtils.sendMessage(session, WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC));
                     return;
                 }
-                websocketMsgService.saveMsg(executeReq, payload);
+                websocketMsgService.receiveAndSaveMsg(executeReq, payload);
                 result = routeService.execute(executeReq);
                 break;
             default:

+ 20 - 14
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/WebsocketSubscriber.java

@@ -17,7 +17,7 @@ import org.redisson.api.listener.MessageListener;
 import org.redisson.codec.SerializationCodec;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
-import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.PingMessage;
 import org.springframework.web.socket.TextMessage;
@@ -33,8 +33,9 @@ import javax.annotation.Resource;
  * @date 2024/7/913:47
  */
 @Slf4j
+@Order(0)
 @Component
-public class WebsocketSubscriber implements ApplicationRunner, Ordered {
+public class WebsocketSubscriber implements ApplicationRunner {
 
     @Resource
     IotWebsocketMsgService websocketMsgService;
@@ -51,19 +52,17 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
                 if (ObjectUtil.hasEmpty(msg.getTokens(), msg.getContent())) {
                     return;
                 }
-                String contentJson = JSON.toJSONString(msg.getContent());
-                log.info("host-websocket 接收到订阅消息:{}", JSON.toJSONString(msg));
-                sendMessage(msg.getTokens(), contentJson);
+                if (msg.getContent() instanceof IotWebsocketMsg) {
+                    sendMessage(msg.getTokens(), (IotWebsocketMsg) msg.getContent());
+                }
+                if (msg.getContent() instanceof WebsocketResult) {
+                    sendMessage(msg.getTokens(), (WebsocketResult) msg.getContent());
+                }
             }
         });
     }
 
-    private void sendMessage(String token, String message) {
-        if (ObjectUtil.isEmpty(token)) {
-            return;
-        }
-
-        IotWebsocketMsg msg = JSON.parseObject(message, IotWebsocketMsg.class);
+    private void sendMessage(String token, IotWebsocketMsg msg) {
         WebSocketSession session = WebSocketSessionHolder.getSessions(token);
         final WebsocketResult result = JSON.parseObject(msg.getReplyContent(), WebsocketResult.class);
         if (session != null && session.isOpen()) {
@@ -84,8 +83,15 @@ public class WebsocketSubscriber implements ApplicationRunner, Ordered {
         }
     }
 
-    @Override
-    public int getOrder() {
-        return 0;
+    private void sendMessage(String token, WebsocketResult result) {
+        WebSocketSession session = WebSocketSessionHolder.getSessions(token);
+        if (session != null && session.isOpen()) {
+            try {
+                session.sendMessage(new PingMessage());
+                session.sendMessage(new TextMessage(JSON.toJSONString(result)));
+            } catch (Exception e) {
+                log.error("发送消息失败", e);
+            }
+        }
     }
 }