|
|
@@ -0,0 +1,78 @@
|
|
|
+package com.xunmei.api.mq.consumer;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
+import com.xunmei.api.mq.Messaging;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.support.AmqpHeaders;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.http.HttpHeaders;
|
|
|
+import org.springframework.messaging.Message;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.MultiValueMap;
|
|
|
+
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.function.Consumer;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * RabbitMq消息消费者
|
|
|
+ */
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class RabbitMqConsumer {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * mq接收ackMessage消息/手动ack确认
|
|
|
+ * @methodName 配置文件对应
|
|
|
+ **/
|
|
|
+ @Bean
|
|
|
+ Consumer<Message<Messaging>> ackMessage() {
|
|
|
+ log.info("ackMessage-初始化订阅");
|
|
|
+ return obj -> {
|
|
|
+ Channel channel = obj.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
|
|
|
+ Long deliveryTag = obj.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
|
|
|
+ try {
|
|
|
+ log.info("ackMessage-消息接收成功:" + obj.getPayload());
|
|
|
+ //业务逻辑处理
|
|
|
+ //ack确认
|
|
|
+ channel.basicAck(deliveryTag, false);
|
|
|
+ } catch (Exception e) {
|
|
|
+ //重新回队列-true则重新入队列,否则丢弃或者进入死信队列。
|
|
|
+// channel.basicReject(deliveryTag, true);
|
|
|
+ log.error(e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * mq接收normal消息
|
|
|
+ **/
|
|
|
+ @Bean
|
|
|
+ Consumer<Messaging> normal() {
|
|
|
+ log.info("normal-初始化订阅");
|
|
|
+ return obj -> {
|
|
|
+ log.info("normal-消息接收成功:" + obj);
|
|
|
+ //业务逻辑处理
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * mq接收延时消息
|
|
|
+ * Messaging 发送实体消息接收实体消息
|
|
|
+ **/
|
|
|
+ @Bean
|
|
|
+ Consumer<Message<Messaging>> delay() {
|
|
|
+ log.info("delay-初始化订阅");
|
|
|
+ return obj -> {
|
|
|
+ Messaging payload = obj.getPayload();
|
|
|
+ log.info("delay-消息接收成功:" + LocalDateTime.now() + " " + payload);
|
|
|
+ //业务逻辑处理
|
|
|
+ };
|
|
|
+ }
|
|
|
+}
|
|
|
+
|