瀏覽代碼

代码优化

jingyuanchao 11 月之前
父節點
當前提交
abcd37cb6e

+ 6 - 6
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/handler/SocWebSocketHandler.java

@@ -213,22 +213,22 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
         switch (typeEnums) {
             case PRODUCT_EVENT_NOTICE:
             case PRODUCT_SERVICE_REPLY:
+                final IotWebsocketMsg saveMsg = websocketMsgService.receiveAndSaveMsg(executeReq, payload);
+                if (saveMsg.getSendBy() == 1) {
+                    //主机回复的消息, 不再继续重试的逻辑
+                    return;
+                }
                 RouterService routeService = RouterServiceHandler.getRouteService(typeEnums.getProductName(), executeReq.getEvent());
                 if (routeService == null) {
                     return;
                 }
-                final IotWebsocketMsg saveMsg = websocketMsgService.receiveAndSaveMsg(executeReq, payload);
-                sendBy = saveMsg.getSendBy();
                 result = routeService.execute(executeReq);
                 break;
             default:
                 result = WebsocketResult.replyError(executeReq, ErrorMsgConstants.ERROR_TOPIC);
                 break;
         }
-        if (sendBy == 1) {
-            //主机回复的消息, 不再继续重试的逻辑
-            return;
-        }
+
         final boolean sendFlag = WebSocketUtils.sendMessage(session, result);
         if (sendFlag) {
             websocketMsgService.sendSuccessMsg(result, executeReq.getId());

+ 33 - 23
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayQueueRunner.java

@@ -6,9 +6,10 @@ import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.redis.enums.RedisDelayQueueEnum;
 import com.xunmei.system.api.util.LogUtils;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.CommandLineRunner;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
@@ -16,39 +17,48 @@ import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Component
-public class RedisDelayQueueRunner implements CommandLineRunner {
+public class RedisDelayQueueRunner implements  Runnable, InitializingBean, DisposableBean  {
 
+    private Thread thread;
 
     @Autowired
     @Qualifier(ThreadPoolConfig.HOST_EXECUTOR)
     private ThreadPoolTaskExecutor ptask;
 
     @Override
-    public void run(String... args) throws Exception {
-        Thread thread = new Thread(() -> {
-            while (!Thread.currentThread().isInterrupted()) {
-                try {
-                    RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
-                    for (RedisDelayQueueEnum queueEnum : queueEnums) {
-                        Object value = RedisDelayedQueueUtil.getDelayQueueData(queueEnum.getCode());
-                        if (value != null) {
-                            LogUtils.WS_MSG_RETRY_LOG.info("{} 延迟队列有数据,开始处理,消息内容:{}", queueEnum.getName(), JacksonUtils.toJSONString(value));
-                            RedisDelayQueueHandle<Object> redisDelayQueueHandle = (RedisDelayQueueHandle<Object>) SpringUtil.getBean(queueEnum.getBeanId());
-                            ptask.execute(() -> {
-                                redisDelayQueueHandle.execute(value);
-                            });
-                        }
+    public void run() {
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
+                for (RedisDelayQueueEnum queueEnum : queueEnums) {
+                    Object value = RedisDelayedQueueUtil.getDelayQueueData(queueEnum.getCode());
+                    if (value != null) {
+                        LogUtils.WS_MSG_RETRY_LOG.info("{} 延迟队列有数据,开始处理,消息内容:{}", queueEnum.getName(), JacksonUtils.toJSONString(value));
+                        RedisDelayQueueHandle<Object> redisDelayQueueHandle = (RedisDelayQueueHandle<Object>) SpringUtil.getBean(queueEnum.getBeanId());
+                        ptask.execute(() -> {
+                            redisDelayQueueHandle.execute(value);
+                        });
                     }
-                    TimeUnit.MILLISECONDS.sleep(500);
-                } catch (InterruptedException e) {
-                    LogUtils.WS_MSG_RETRY_LOG.error("(Redission延迟队列监测异常中断) {}", e.getMessage());
                 }
+                TimeUnit.MILLISECONDS.sleep(500);
+            } catch (InterruptedException e) {
+                LogUtils.WS_MSG_RETRY_LOG.error("(Redission延迟队列监测异常中断) {}", e.getMessage());
             }
-        });
-        thread.setDaemon(true);
-        thread.setName("websocketMsg-delay—retry");
+        }
+    }
 
-        thread.start();
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        this.thread = new Thread(this);
+        this.thread.setName("websocketMsg-delay—retry" + thread.getId());
+        this.thread.setDaemon(true);
         LogUtils.WS_MSG_RETRY_LOG.info("Redission延迟队列监测启动!");
+        this.thread.start();
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        this.thread.interrupt();
     }
+
 }

+ 0 - 1
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayedQueueUtil.java

@@ -14,7 +14,6 @@ import java.time.LocalDateTime;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
-
 public class RedisDelayedQueueUtil {
 
     private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);