|
|
@@ -0,0 +1,89 @@
|
|
|
+package com.xunmei.mediator.api.service;
|
|
|
+
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
+import com.xunmei.mediator.domain.dto.north.NorthMsgId;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.core.AmqpTemplate;
|
|
|
+import org.springframework.amqp.core.Message;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.scheduling.annotation.EnableAsync;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.Date;
|
|
|
+
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+@EnableAsync
|
|
|
+public class RabbitMsgIdService {
|
|
|
+
|
|
|
+ public static final String QUEUE = "msg_id_brand_id_1.v1";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建消息队列
|
|
|
+ **/
|
|
|
+ @Bean
|
|
|
+ public org.springframework.amqp.core.Queue getQueue() {
|
|
|
+ return new org.springframework.amqp.core.Queue(QUEUE);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * rabbitmq服务
|
|
|
+ */
|
|
|
+ @Autowired
|
|
|
+ private AmqpTemplate rabbitTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private INorthMsgIdService northMsgIdService;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 给mq发送消息
|
|
|
+ *
|
|
|
+ * @param msgId
|
|
|
+ * @param interfacePath
|
|
|
+ * @param interfaceName
|
|
|
+ */
|
|
|
+ @Async("asyncExecutor_list")
|
|
|
+ public void sendMsgId(String msgId, String interfacePath, String interfaceName) {
|
|
|
+ Date date = new Date();
|
|
|
+ String format = DateUtil.format(date, "yyyy-MM-dd");
|
|
|
+ NorthMsgId msg = new NorthMsgId();
|
|
|
+ msg.setMsgId(msgId);
|
|
|
+ msg.setInterfaceName(interfaceName);
|
|
|
+ msg.setInterfacePath(interfacePath);
|
|
|
+ msg.setYmd(format);
|
|
|
+ msg.setMsgDate(date);
|
|
|
+ log.info("发送消息:{}", JSON.toJSON(msg));
|
|
|
+ this.rabbitTemplate.convertAndSend(QUEUE, msg);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 通过监听的方式接收消息(注意这里方法是 final ,因为Spring开启代码,如果不设置final 会报异常)
|
|
|
+ *
|
|
|
+ * @param message 未折封的消息对象(非必需要)
|
|
|
+ */
|
|
|
+ @RabbitListener(queues = QUEUE)
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public void processss(Message message, Channel channel) throws IOException {
|
|
|
+ try {
|
|
|
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ String content = new String(message.getBody());
|
|
|
+ try {
|
|
|
+ NorthMsgId northMsgId = JSON.parseObject(content, NorthMsgId.class);
|
|
|
+ northMsgIdService.saveOrUpdate(northMsgId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("消费失败原因:主键重复");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|