Selaa lähdekoodia

主机平台调试消息重试代码修改提交

jingyuanchao 11 kuukautta sitten
vanhempi
commit
b5df43c8e4

+ 19 - 18
project_data/sql/1.0.11/soc.sql

@@ -1,22 +1,23 @@
 
 drop table if exists `iot_websocket_msg`;
 CREATE TABLE `iot_websocket_msg` (
-         `id` varchar(125) COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息id',
-         `event` varchar(125) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '事件类型',
-         `org_id` bigint DEFAULT NULL COMMENT '机构id',
-         `org_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '机构名称',
-         `iot_code` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '目标主机',
-         `server_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '主机名称',
-         `status` int DEFAULT NULL COMMENT '消息状态;0:处理中,1:处理完成,2:重试中,3:重试失败',
-         `receive_content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '收到消息内容',
-         `reply_content` text COLLATE utf8mb4_general_ci COMMENT '回复内容',
-         `ready_send_time` datetime DEFAULT NULL COMMENT '准备发送时间',
-         `really_send_time` datetime DEFAULT NULL COMMENT '重试成功时间',
-         `fire_time` datetime DEFAULT NULL COMMENT '触发时间',
-         `max_retry_times` int DEFAULT NULL COMMENT '最大重试次数',
-         `cur_retry_times` int DEFAULT NULL COMMENT '当前重试次数',
-         `retry_interval` int DEFAULT NULL COMMENT '重试间隔,分钟',
-         `create_time` datetime DEFAULT NULL COMMENT '创建时间',
-         `update_time` datetime DEFAULT NULL COMMENT '最后一次修改时间',
-         PRIMARY KEY (`id`)
+     `id` varchar(125) COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息id',
+     `send_by` int DEFAULT NULL COMMENT '发送方;0:主机,1:平台',
+     `event` varchar(125) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '事件类型',
+     `org_id` bigint DEFAULT NULL COMMENT '机构id',
+     `org_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '机构名称',
+     `iot_code` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '目标主机',
+     `server_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '主机名称',
+     `status` int DEFAULT NULL COMMENT '消息状态;0:处理中,1:处理完成,2:重试中,3:重试失败',
+     `receive_content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '收到消息内容',
+     `reply_content` text COLLATE utf8mb4_general_ci COMMENT '回复内容',
+     `ready_send_time` datetime DEFAULT NULL COMMENT '准备发送时间',
+     `really_send_time` datetime DEFAULT NULL COMMENT '重试成功时间',
+     `fire_time` datetime DEFAULT NULL COMMENT '触发时间',
+     `max_retry_times` int DEFAULT NULL COMMENT '最大重试次数',
+     `cur_retry_times` int DEFAULT NULL COMMENT '当前重试次数',
+     `retry_interval` int DEFAULT NULL COMMENT '重试间隔,分钟',
+     `create_time` datetime DEFAULT NULL COMMENT '创建时间',
+     `update_time` datetime DEFAULT NULL COMMENT '最后一次修改时间',
+     PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

+ 4 - 0
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/domain/iot/domain/IotWebsocketMsg.java

@@ -31,6 +31,10 @@ public class IotWebsocketMsg implements Serializable {
     @TableId("id")
     private String id;
 
+    @ApiModelProperty(value = "发送方;0:主机,1:平台")
+    @TableField("send_by")
+    private int sendBy;
+
     @ApiModelProperty(value = "事件类型")
     @TableField("event")
     private String event;

+ 1 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/alarm/service/impl/IotAlarmRuleServiceImpl.java

@@ -81,7 +81,7 @@ public class IotAlarmRuleServiceImpl extends ServiceImpl<IotAlarmRuleMapper, Iot
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new com.alibaba.fastjson.JSONObject(), WebSocketConstants.ALARM_RULE_TO_HOST, jsb);
 
 
-        final IotWebsocketMsg saveMsg = websocketMsgService.sendAndSaveMsg(iotAlarmRule.getIotCode(), result);
+        final IotWebsocketMsg saveMsg = websocketMsgService.proactiveSendAndSaveMsg(iotAlarmRule.getIotCode(), result);
         websocketService.sendMsgByTokens(saveMsg, iotAlarmRule.getIotCode());
     }
 

+ 5 - 12
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/IotWebsocketMsgService.java

@@ -15,18 +15,11 @@ import com.xunmei.host.websocket.dto.WebsocketResult;
  */
 public interface IotWebsocketMsgService extends IService<IotWebsocketMsg> {
 
-    /**
-     * 针对主机上报的消息
-     * @param req
-     * @param receiveMsg
-     */
-    void receiveAndSaveMsg(WebsocketExecuteReq req, String receiveMsg);
-    /**
-     * 针对平台下发到主机的消息
-     * @param req
-     * @param res
-     */
-    IotWebsocketMsg sendAndSaveMsg(String iotCode, WebsocketResult res);
+    //针对主机上报或者主机回复的消息
+    IotWebsocketMsg receiveAndSaveMsg(WebsocketExecuteReq req, String receiveMsg);
+
+    //针对平台主动下发到主机的消息
+    IotWebsocketMsg proactiveSendAndSaveMsg(String iotCode, WebsocketResult res);
     void sendSuccessMsg(WebsocketResult res, String msgId);
     void sendSuccessMsg(String msgId);
     void sendFailMsg(WebsocketResult res, String msgId);

+ 12 - 6
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/north/service/impl/IotWebsocketMsgServiceImpl.java

@@ -54,12 +54,18 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
     public static final Integer retryInterval = 1;
 
     @Override
-    @Async(ThreadPoolConfig.HOST_EXECUTOR)
     @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
-    public void receiveAndSaveMsg(WebsocketExecuteReq req, String receiveMsg) {
+    public IotWebsocketMsg receiveAndSaveMsg(WebsocketExecuteReq req, String receiveMsg) {
         //刚刚收到消息,还没开始处理业务逻辑,先保存消息数据
         final IotServerInfo serverInfo = req.getServerInfo();
-        final IotWebsocketMsg msg = new IotWebsocketMsg();
+        IotWebsocketMsg msg = getById(req.getId());
+        if (msg != null) {
+            msg.setReceiveContent(receiveMsg);
+            updateById(msg);
+            return msg;
+        }
+        msg = new IotWebsocketMsg();
+        msg.setSendBy(0);
         msg.setId(req.getId());
         msg.setEvent(req.getEvent());
         msg.setOrgId(serverInfo.getOrgId());
@@ -75,12 +81,11 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
         msg.setUpdateTime(LocalDateTime.now());
         save(msg);
         LogUtils.WS_MSG_RETRY_LOG.info("收到主机消息,msg: {}, Thread-Name: {}", JacksonUtils.toJSONString(msg), Thread.currentThread().getName());
+        return msg;
     }
 
     @Override
-    public IotWebsocketMsg sendAndSaveMsg(String iotCode, WebsocketResult res) {
-        //刚刚收到消息,还没开始处理业务逻辑,先保存消息数据
-
+    public IotWebsocketMsg proactiveSendAndSaveMsg(String iotCode, WebsocketResult res) {
         final IotServerInfo serverInfo = serverInfoService.selectByToken(iotCode);
         final IotWebsocketMsg msg = new IotWebsocketMsg();
         final LocalDateTime now = LocalDateTime.now();
@@ -94,6 +99,7 @@ public class IotWebsocketMsgServiceImpl extends ServiceImpl<IotWebsocketMsgMappe
         msg.setReplyContent(JacksonUtils.toJSONString(res));
         msg.setReadySendTime(now);
         msg.setMaxRetryTimes(maxRetryTimes);
+        msg.setSendBy(1);
         msg.setCurRetryTimes(0);
         msg.setRetryInterval(retryInterval);
         msg.setCreateTime(now);

+ 3 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/dto/WebsocketPayloadResolve.java

@@ -20,6 +20,7 @@ public class WebsocketPayloadResolve {
     private String routingKey;
     private JSONObject header;
     private Object data;
+    private String id;
 
 
     public WebsocketPayloadResolve(WebsocketResult result) {
@@ -51,9 +52,10 @@ public class WebsocketPayloadResolve {
 
 
         this.routingKey = getRoutingKey(payloadToExtract);
-        this.header = payloadToExtract.getJSONObject(WebSocketConstants.HEADER);
+        this.header = payloadToExtract.getJSONObject(WebSocketConstants.HEADER) == null ? result.getHeaders() : payloadToExtract.getJSONObject(WebSocketConstants.HEADER);
         this.data = payloadToExtract.get(WebSocketConstants.ARGS) != null ? payloadToExtract.get(WebSocketConstants.ARGS) : payloadToExtract.get(WebSocketConstants.DATA);
         this.topic = isPassThrough ? ((JSONObject) parentPayload.get(WebSocketConstants.ARGS)).toJavaObject(WebsocketResult.class).getTopic() : result.getTopic();
+        this.id = this.header.getString(WebSocketConstants.REPLY_ID) == null ? result.getId() : this.header.getString(WebSocketConstants.REPLY_ID);
     }
 
     private String getRoutingKey(JSONObject payload) {

+ 9 - 3
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java

@@ -7,6 +7,7 @@ import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.constant.ErrorMsgConstants;
 import com.xunmei.common.core.constant.SecurityConstants;
 import com.xunmei.common.core.domain.iot.domain.IotServerInfo;
+import com.xunmei.common.core.domain.iot.domain.IotWebsocketMsg;
 import com.xunmei.common.core.enums.iot.IotServerConnectStatus;
 import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.core.utils.StringUtils;
@@ -194,7 +195,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
 
             //上报消息内容
             final Object args = payloadResolve.getData();
-            WebsocketExecuteReq executeReq = new WebsocketExecuteReq(payloadResolve.getRoutingKey(), args, serverInfo, websocketResult.getId(), topic, payloadResolve.getHeader(), typeEnums.getProductName(), typeEnums.getDeviceName());
+            WebsocketExecuteReq executeReq = new WebsocketExecuteReq(payloadResolve.getRoutingKey(), args, serverInfo, payloadResolve.getId(), topic, payloadResolve.getHeader(), typeEnums.getProductName(), typeEnums.getDeviceName());
             handleRequestBasedOnType(executeReq, typeEnums, session, payload);
         } catch (Exception e) {
             logErrorAndReturn("消息处理失败", ip, e);
@@ -208,21 +209,26 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
 
     private void handleRequestBasedOnType(WebsocketExecuteReq executeReq, TopicTypeEnums typeEnums, WebSocketSession session, String payload) {
         WebsocketResult result;
+        int sendBy = 0;
         switch (typeEnums) {
             case PRODUCT_EVENT_NOTICE:
             case PRODUCT_SERVICE_REPLY:
                 RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(), executeReq.getEvent());
                 if (routeService == null) {
-                    // WebSocketUtils.sendMessage(session, WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC));
                     return;
                 }
-                websocketMsgService.receiveAndSaveMsg(executeReq, payload);
+                final IotWebsocketMsg saveMsg = websocketMsgService.receiveAndSaveMsg(executeReq, payload);
+                sendBy = saveMsg.getSendBy();
                 result = routeService.execute(executeReq);
                 break;
             default:
                 result = WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC);
                 break;
         }
+        if (sendBy == 1) {
+            //主机回复的消息, 不再继续重试的逻辑
+            return;
+        }
         final boolean sendFlag = WebSocketUtils.sendMessage(session, result);
         if (sendFlag) {
             websocketMsgService.sendSuccessMsg(result, executeReq.getId());

+ 1 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/service/impl/WebsocketServiceImpl.java

@@ -42,7 +42,7 @@ public class WebsocketServiceImpl implements WebsocketService {
             return;
         }
         for (String token : set) {
-            final IotWebsocketMsg iotWebsocketMsg = websocketMsgService.sendAndSaveMsg(token, result);
+            final IotWebsocketMsg iotWebsocketMsg = websocketMsgService.proactiveSendAndSaveMsg(token, result);
             sendMsgByTokens(iotWebsocketMsg, token);
         }
     }