Ver Fonte

增加redis延迟队列

jingyuanchao há 1 ano atrás
pai
commit
ac980a889d

+ 80 - 0
soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/delay/RedisDelayedQueue.java

@@ -0,0 +1,80 @@
+package com.xunmei.common.redis.delay;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RBlockingQueue;
+import org.redisson.api.RDelayedQueue;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Redis 延时队列
+ */
+@Component
+@Slf4j
+public class RedisDelayedQueue {
+
+    @Autowired
+    private RedissonClient redissonClient;
+
+    /**
+     * 添加对象进延时队列
+     * @param putInData 添加数据
+     * @param delay     延时时间
+     * @param timeUnit  时间单位
+     * @param queueName 队列名称
+     * @param <T>
+     */
+    private <T> void addQueue(T putInData, long delay, TimeUnit timeUnit, String queueName){
+        log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,putInData);
+        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
+        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
+        delayedQueue.offer(putInData, delay, timeUnit);
+    }
+
+    /**
+     * 添加队列-秒
+     *
+     * @param t     DTO传输类
+     * @param delay 时间数量
+     * @param <T>   泛型
+     */
+    public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
+        addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
+    }
+
+    /**
+     * 添加队列-分
+     *
+     * @param t     DTO传输类
+     * @param delay 时间数量
+     * @param <T>   泛型
+     */
+    public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
+        addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
+    }
+
+    /**
+     * 添加队列-时
+     *
+     * @param t     DTO传输类
+     * @param delay 时间数量
+     * @param <T>   泛型
+     */
+    public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
+        addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
+    }
+    /**
+     * 添加队列-天
+     *
+     * @param t     DTO传输类
+     * @param delay 时间数量
+     * @param <T>   泛型
+     */
+    public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
+        addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
+    }
+}

+ 57 - 0
soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/delay/RedisDelayedQueueInit.java

@@ -0,0 +1,57 @@
+package com.xunmei.common.redis.delay;
+
+
+import com.alibaba.fastjson2.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RBlockingQueue;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * redis 延时队列初始化
+ */
+@Component
+@Slf4j
+public class RedisDelayedQueueInit implements ApplicationContextAware {
+    @Autowired
+    private RedissonClient redissonClient;
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
+        for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
+            String listenerName = taskEventListenerEntry.getValue().getClass().getName();
+
+            startThread(listenerName, taskEventListenerEntry.getValue());
+        }
+    }
+
+    /**
+     * 启动线程获取队列
+     * @param queueName 队列名称
+     * @param redisDelayedQueueListener 任务回调监听
+     */
+    private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
+        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
+        //由于此线程需要常驻,可以新建线程,不用交给线程池管理
+        Thread thread = new Thread(() -> {
+            log.info("启动监听队列线程" + queueName);
+            while (true) {
+                try {
+                    T t = blockingFairQueue.take();
+                    log.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));
+                    redisDelayedQueueListener.invoke(t);
+                } catch (Exception e) {
+                    log.info("监听队列线程错误,", e);
+                }
+            }
+        });
+        thread.setName(queueName);
+        thread.start();
+    }
+}

+ 10 - 0
soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/delay/RedisDelayedQueueListener.java

@@ -0,0 +1,10 @@
+package com.xunmei.common.redis.delay;
+
+public interface RedisDelayedQueueListener<T> {
+
+    /**
+     * 执行方法
+     * @param t
+     */
+    void invoke(T t);
+}

+ 14 - 0
soc-common/soc-common-redis/src/main/java/com/xunmei/common/redis/delay/RegisterBookFileExpirationListener.java

@@ -0,0 +1,14 @@
+package com.xunmei.common.redis.delay;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class RegisterBookFileExpirationListener implements RedisDelayedQueueListener<String> {
+    @Override
+    public void invoke(String key) {
+        log.info(" redis延迟队列监听到消息, key:{}", key);
+    }
+}

+ 2 - 5
soc-modules/soc-modules-core/src/main/java/com/xunmei/core/SocCoreApplication.java

@@ -7,16 +7,13 @@ import com.xunmei.common.swagger.annotation.EnableCustomSwagger2;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
 import org.springframework.scheduling.annotation.EnableAsync;
 
-import java.text.SimpleDateFormat;
-import java.time.LocalDate;
-import java.util.Calendar;
-import java.util.Date;
-
 
 @MapperScan("com.xunmei.core.**.mapper")
 @EnableCustomConfig
+@ComponentScan(basePackages = {"com.xunmei.core", "com.xunmei.common.redis"})
 @EnableCustomSwagger2
 @EnableSocFeignClients
 @SpringBootApplication

+ 30 - 17
soc-modules/soc-modules-file/src/main/java/com/xunmei/file/service/LocalSysFileServiceImpl.java

@@ -19,6 +19,8 @@ import com.xunmei.common.core.domain.registerbook.vo.PdfToZipTempVo;
 import com.xunmei.common.core.enums.RegisterBookType;
 import com.xunmei.common.core.utils.DateHelper;
 import com.xunmei.common.core.utils.uuid.UUID;
+import com.xunmei.common.redis.delay.RedisDelayedQueue;
+import com.xunmei.common.redis.delay.RegisterBookFileExpirationListener;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.file.utils.FileDownUtils;
 import com.xunmei.file.utils.FileUploadUtils;
@@ -77,6 +79,11 @@ public class LocalSysFileServiceImpl implements ISysFileService {
 
     @Autowired
     private HttpServletRequest request;
+
+    @Autowired
+    private RedisDelayedQueue delayedQueue;
+
+
     private static PdfFilePathVo getLocalFilePath(String localFilePath, String businessType, String fileName) {
         final String path = File.separator + businessType + File.separator + DateUtil.format(new Date(), "yyyy" + File.separator + "MM" + File.separator + "dd" + File.separator);
         final File file = new File(localFilePath + path);
@@ -608,8 +615,8 @@ public class LocalSysFileServiceImpl implements ISysFileService {
             response.setContentType("application/octet-stream");
             response.setHeader("Content-Disposition", "attachment; filename=" + zipName);
             List<PdfToZipTempVo> pdfToZipTempVoList = registerBookPdfList.parallelStream().map(pdf -> {
-                return resolve(pdf, count);
-            }).filter(Objects::nonNull)
+                        return resolve(pdf, count);
+                    }).filter(Objects::nonNull)
                     .collect(Collectors.toList());
             count.await();
             pdfToZipTempVoList.removeIf(pdfToZipTempVo -> !FileUtil.exist(pdfToZipTempVo.getFile()));
@@ -635,7 +642,7 @@ public class LocalSysFileServiceImpl implements ISysFileService {
     }
 
 
-    public PdfToZipTempVo resolve(CoreRegisterBookPdfPageVo pdf,CountDownLatch count) {
+    public PdfToZipTempVo resolve(CoreRegisterBookPdfPageVo pdf, CountDownLatch count) {
 
         final File temp = new File(TEMP_DIR_NAME);
         if (!temp.exists()) {
@@ -703,20 +710,21 @@ public class LocalSysFileServiceImpl implements ISysFileService {
         fileInputStream.close();
     }
 
-    private List<SysOrgVO> getChildrenList(Long orgId){
+    private List<SysOrgVO> getChildrenList(Long orgId) {
         List<SysOrgVO> cacheList = RedisUtils.getCacheList(CacheConstants.ORG_CACHE_LIST_KEY);
         return cacheList.stream()
                 .filter(org -> ObjectUtil.equal(org.getParentId(), orgId))
                 .collect(Collectors.toList());
     }
 
-    private  SysOrgVO getCurOrg(Long orgId){
+    private SysOrgVO getCurOrg(Long orgId) {
         List<SysOrgVO> cacheList = RedisUtils.getCacheList(CacheConstants.ORG_CACHE_LIST_KEY);
         return cacheList.stream()
                 .filter(org -> ObjectUtil.equal(org.getId(), orgId))
                 .findFirst().get();
     }
 
+
     @Override
     public void cutFileCompress(CoreRegisterBookPdfExportDto pdfDto) {
         SysOrgVO org = getCurOrg(pdfDto.getOrgId());
@@ -735,7 +743,7 @@ public class LocalSysFileServiceImpl implements ISysFileService {
                 zipName = URLEncoder.encode(fileNameStr + ".zip", "UTF-8");
 
                 List<PdfToZipTempVo> pdfToZipTempVoList = list.parallelStream().map(pdf -> {
-                            return resolve(pdf,  count);
+                            return resolve(pdf, count);
                         }).filter(Objects::nonNull)
                         .collect(Collectors.toList());
                 pdfToZipTempVoList.removeIf(pdfToZipTempVo -> !FileUtil.exist(pdfToZipTempVo.getFile()));
@@ -755,17 +763,7 @@ public class LocalSysFileServiceImpl implements ISysFileService {
                 zos.close();
                 fos.close();
                 num++;
-                PdfLocalFileTempVo pdfLocalFileTempVo = new PdfLocalFileTempVo();
-                pdfLocalFileTempVo.setOrgId(pdfDto.getOrgId());
-                pdfLocalFileTempVo.setOrgName(org.getName());
-                pdfLocalFileTempVo.setOrgPath(org.getPath());
-                pdfLocalFileTempVo.setLocalFileName(localFileName);
-                pdfLocalFileTempVo.setZipName(URLDecoder.decode(zipName, "UTF-8"));
-                pdfLocalFileTempVo.setFileSize(changeUnit(fileSize));
-                pdfLocalFileTempVo.setDownLoadTime(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss"));
-                pdfLocalFileTempVo.setIsRegisterBookPage(pdfDto.getIsRegisterBookPage());
-                pdfLocalFileTempVo.setCreateTime(new Date());
-                RedisUtils.setCacheObject(URLDecoder.decode(localFileName, "UTF-8"), JSON.toJSONString(pdfLocalFileTempVo));
+                saveFileDataToRedis(org, date, zipName, localFileName, fileSize, pdfDto);
             } catch (Throwable e) {
                 throw new RuntimeException(e);
             } finally {
@@ -779,6 +777,21 @@ public class LocalSysFileServiceImpl implements ISysFileService {
 
     }
 
+    private void saveFileDataToRedis(SysOrgVO org, Date date, String zipName, String localFileName, long fileSize, CoreRegisterBookPdfExportDto pdfDto) throws UnsupportedEncodingException {
+        PdfLocalFileTempVo pdfLocalFileTempVo = new PdfLocalFileTempVo();
+        pdfLocalFileTempVo.setOrgId(pdfDto.getOrgId());
+        pdfLocalFileTempVo.setOrgName(org.getName());
+        pdfLocalFileTempVo.setOrgPath(org.getPath());
+        pdfLocalFileTempVo.setLocalFileName(localFileName);
+        pdfLocalFileTempVo.setZipName(URLDecoder.decode(zipName, "UTF-8"));
+        pdfLocalFileTempVo.setFileSize(changeUnit(fileSize));
+        pdfLocalFileTempVo.setDownLoadTime(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss"));
+        pdfLocalFileTempVo.setIsRegisterBookPage(pdfDto.getIsRegisterBookPage());
+        pdfLocalFileTempVo.setCreateTime(new Date());
+        RedisUtils.setCacheObject(URLDecoder.decode(localFileName, "UTF-8"), JSON.toJSONString(pdfLocalFileTempVo));
+        delayedQueue.addQueueHours(pdfLocalFileTempVo.getLocalFileName(), 1, RegisterBookFileExpirationListener.class);
+    }
+
     private List<List<CoreRegisterBookPdfPageVo>> checkSubList(CoreRegisterBookPdfExportDto pdfDto) {
         List<List<CoreRegisterBookPdfPageVo>> list = new ArrayList<>();
         List<CoreRegisterBookPdfPageVo> registerBookPdfList = pdfDto.getDataList();