Selaa lähdekoodia

redis延迟队列bug处理

jingyuanchao 10 kuukautta sitten
vanhempi
commit
bb8aa4d32d

+ 19 - 0
soc-common/soc-common-core/src/main/java/com/xunmei/common/core/thread/ThreadPoolConfig.java

@@ -18,6 +18,7 @@ public class ThreadPoolConfig implements AsyncConfigurer {
      */
     public static final String SOC_EXECUTOR = "socExecutor";
     public static final String HOST_EXECUTOR = "hostExecutor";
+    public static final String DELAY_EXECUTOR = "delayExecutor";
 
 
     @Override
@@ -60,4 +61,22 @@ public class ThreadPoolConfig implements AsyncConfigurer {
         executor.initialize();
         return executor;
     }
+
+    @Bean(DELAY_EXECUTOR)
+    @Primary
+    public ThreadPoolTaskExecutor delayExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        //优雅停机
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setCorePoolSize(2);
+        executor.setMaxPoolSize(2);
+        executor.setQueueCapacity(200);
+        executor.setThreadNamePrefix("delay-executor-");
+        //拒绝策略,由投递者执行
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        //装饰器,用于异步线程内捕获异常
+        executor.setThreadFactory(new SocThreadFactory(executor));
+        executor.initialize();
+        return executor;
+    }
 }

+ 3 - 3
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/iot/service/impl/IotDeviceInfoServiceImpl.java

@@ -368,7 +368,7 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
                 //平台下发设备后,主机上报该设备保存结果
                 case WebSocketConstants.INCREMENT_DEVICES_RESULT_EVENT:
                     dealPushDeviceResult(data);
-                    break;
+                    return null;
                 default:
                     return WebsocketResult.replyError(req, ErrorMsgConstants.ERROR_ROUTE);
             }
@@ -1286,13 +1286,13 @@ public class IotDeviceInfoServiceImpl extends ServiceImpl<IotDeviceInfoMapper, I
         }
         final String topic = TopicTypeEnums.formatUrl(TopicTypeEnums.PRODUCT_SERVICE_INVOKE.getUrl(), WebSocketConstants.DETECTION_HOST, WebSocketConstants.DETECTION_HOST_DEVICE);
         final WebsocketResult result = WebsocketResult.invokeHostServer(topic, new JSONObject(), WebSocketConstants.PUSH_DEVICES_SERVICES, param);
-        websocketService.sendMsgByTokens(result, deviceInfo.getIotToken());
         monitorSyncStatus(deviceInfo);
+        websocketService.sendMsgByTokens(result, deviceInfo.getIotToken());
         LogUtils.SYNC_DEVICE.info("同步设备[ {} ]至主机 [ {} ],设备类型:{},设备参数:{}", deviceInfo.getDeviceName(), deviceInfo.getIotToken(), deviceTypeEnum.getDesc(), param);
     }
 
     private void monitorSyncStatus(IotDeviceInfo deviceInfo) {
-        final LocalDateTime time = LocalDateTime.now().plusMinutes(5);
+        final LocalDateTime time = LocalDateTime.now().plusSeconds(30);
         RedisDelayedQueueUtil.addDelayQueue(deviceInfo.getId(), time, RedisDelayQueueEnum.DEVICE_SYNC_STATUS_MONITOR.getCode());
     }
 

+ 15 - 17
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayQueueRunner.java

@@ -13,36 +13,34 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
-import java.util.concurrent.TimeUnit;
-
 @Slf4j
 @Component
-public class RedisDelayQueueRunner implements  Runnable, InitializingBean, DisposableBean  {
+public class RedisDelayQueueRunner implements Runnable, InitializingBean, DisposableBean {
 
     private Thread thread;
 
     @Autowired
-    @Qualifier(ThreadPoolConfig.HOST_EXECUTOR)
+    @Qualifier(ThreadPoolConfig.DELAY_EXECUTOR)
     private ThreadPoolTaskExecutor ptask;
 
     @Override
     public void run() {
         while (!Thread.currentThread().isInterrupted()) {
-            try {
-                RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
-                for (RedisDelayQueueEnum queueEnum : queueEnums) {
-                    Object value = RedisDelayedQueueUtil.getDelayQueueData(queueEnum.getCode());
-                    if (value != null) {
-                        LogUtils.WS_MSG_RETRY_LOG.info("{} 延迟队列有数据,开始处理,消息内容:{}", queueEnum.getName(), JacksonUtils.toJSONString(value));
-                        RedisDelayQueueHandle<Object> redisDelayQueueHandle = (RedisDelayQueueHandle<Object>) SpringUtil.getBean(queueEnum.getBeanId());
-                        ptask.execute(() -> {
+            RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
+            for (RedisDelayQueueEnum queueEnum : queueEnums) {
+                ptask.execute(() -> {
+                    try {
+                        Object value = RedisDelayedQueueUtil.getDelayQueueData(queueEnum.getCode());
+                        if (value != null) {
+                            LogUtils.WS_MSG_RETRY_LOG.info("{} 延迟队列有数据,开始处理,消息内容:{}", queueEnum.getName(), JacksonUtils.toJSONString(value));
+                            RedisDelayQueueHandle<Object> redisDelayQueueHandle = (RedisDelayQueueHandle<Object>) SpringUtil.getBean(queueEnum.getBeanId());
                             redisDelayQueueHandle.execute(value);
-                        });
+                        }
+                    } catch (Exception e) {
+                        LogUtils.WS_MSG_RETRY_LOG.error("延迟队列处理异常", e);
+                        throw new RuntimeException(e);
                     }
-                }
-                TimeUnit.MILLISECONDS.sleep(500);
-            } catch (InterruptedException e) {
-                LogUtils.WS_MSG_RETRY_LOG.error("(Redission延迟队列监测异常中断) {}", e.getMessage());
+                });
             }
         }
     }

+ 4 - 9
soc-modules/soc-modules-host/src/main/java/com/xunmei/host/websocket/redis/delay/RedisDelayedQueueUtil.java

@@ -63,15 +63,10 @@ public class RedisDelayedQueueUtil {
      * @throws InterruptedException
      */
     public static <T> T getDelayQueueData(String queueCode) throws InterruptedException {
-        try {
-            RBlockingDeque<T> blockingDeque = CLIENT.getBlockingDeque(queueCode);
-            //避免消息伪丢失(应用重启未消费),官网推荐
-            CLIENT.getDelayedQueue(blockingDeque);
-            return (T) blockingDeque.take();
-        } catch (InterruptedException e) {
-            LogUtils.WS_MSG_RETRY_LOG.error("获取延时队列失败,队列键:{}, 失败原因:{}", queueCode, e.getCause());
-            throw new RuntimeException(e);
-        }
+        RBlockingDeque<T> blockingDeque = CLIENT.getBlockingDeque(queueCode);
+        //避免消息伪丢失(应用重启未消费),官网推荐
+        CLIENT.getDelayedQueue(blockingDeque);
+        return (T) blockingDeque.take();
     }