From 6ce5be92aa3cedf7c352f64fe02bbb9810cb626c Mon Sep 17 00:00:00 2001
From: "pref_mail@163.com" <123456>
Date: Fri, 3 Nov 2023 14:56:56 +0800
Subject: [PATCH] =?UTF-8?q?=E5=90=AF=E7=94=A8=E6=AC=A7=E6=B4=BE=E6=95=B0?=
=?UTF-8?q?=E6=8D=AE=E9=80=80=E9=80=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
blade-service/logpm-old-project/pom.xml | 6 +
.../config/RabbitMqConfiguration.java | 125 ++++++++++++++++++
2 files changed, 131 insertions(+)
create mode 100644 blade-service/logpm-old-project/src/main/java/com/logpm/oldproject/config/RabbitMqConfiguration.java
diff --git a/blade-service/logpm-old-project/pom.xml b/blade-service/logpm-old-project/pom.xml
index 85610f7c6..4d509a622 100644
--- a/blade-service/logpm-old-project/pom.xml
+++ b/blade-service/logpm-old-project/pom.xml
@@ -23,6 +23,12 @@
org.springblade
blade-starter-swagger
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
org.springblade
diff --git a/blade-service/logpm-old-project/src/main/java/com/logpm/oldproject/config/RabbitMqConfiguration.java b/blade-service/logpm-old-project/src/main/java/com/logpm/oldproject/config/RabbitMqConfiguration.java
new file mode 100644
index 000000000..c736abbd6
--- /dev/null
+++ b/blade-service/logpm-old-project/src/main/java/com/logpm/oldproject/config/RabbitMqConfiguration.java
@@ -0,0 +1,125 @@
+package com.logpm.oldproject.config;
+
+import com.alibaba.nacos.shaded.com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.common.constant.RabbitConstant;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.retry.MessageRecoverer;
+import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+
+/**
+ * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类
+ *
+ * @author yangkai.shen
+ */
+@Slf4j
+@Configuration
+public class RabbitMqConfiguration {
+
+ @Bean
+ public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
+ RabbitTemplate template = new RabbitTemplate();
+ template.setConnectionFactory(connectionFactory);
+ template.setMandatory(true);
+ template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
+ @Override
+ public void confirm(CorrelationData correlationData, boolean b, String s) {
+ System.out.println("确认回调-相关数据:"+correlationData);
+ System.out.println("确认回调-确认情况:"+b);
+ System.out.println("确认回调-原因:"+s);
+// Long id = Long.parseLong(correlationData.getId());
+// UpdateWrapper updateWrapper = new UpdateWrapper<>();
+// if(b){
+// //修改处理日志为已处理
+// updateWrapper.set("status",0)
+// .eq("id",id);
+//
+// }else{
+// //修改处理日志为已处理
+// updateWrapper.set("status",2)
+// .eq("id",id);
+// }
+// panOrderStatusLogService.update(updateWrapper);
+ }
+ });
+
+ template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
+ @Override
+ public void returnedMessage(ReturnedMessage returnedMessage) {
+ System.out.println("返回回调-消息:"+returnedMessage.getMessage());
+ System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode());
+ System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText());
+ System.out.println("返回回调-交换机:"+returnedMessage.getExchange());
+ System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey());
+ }
+ });
+ return template;
+ }
+
+ @Bean
+ public DirectExchange errorMessageExchange(){
+ return new DirectExchange(RabbitConstant.ERROR_EXCHANGE);
+ }
+ @Bean
+ public Queue errorQueue(){
+ return new Queue(RabbitConstant.ERROR_QUEUE, true);
+ }
+ @Bean
+ public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
+ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RabbitConstant.ERROR_ROUTING);
+ }
+
+ /**
+ * 消费失败队列
+ * @param rabbitTemplate
+ * @return
+ */
+ @Bean
+ public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
+ return new RepublishMessageRecoverer(rabbitTemplate, RabbitConstant.ERROR_EXCHANGE, RabbitConstant.ERROR_ROUTING);
+ }
+
+
+ /**
+ * 延迟队列
+ */
+ @Bean
+ public Queue orderStatusQueue() {
+ return new Queue(RabbitConstant.ORDER_STATUS_QUEUE, true);
+ }
+
+ /**
+ * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定
+ */
+ @Bean
+ public CustomExchange orderStatusExchange() {
+ Map args = Maps.newHashMap();
+ args.put("x-delayed-type", "direct");
+ return new CustomExchange(RabbitConstant.ORDER_STATUS_EXCHANGE, "x-delayed-message", true, false, args);
+ }
+
+ /**
+ * 延迟队列绑定自定义交换器
+ *
+ * @param orderStatusQueue 队列
+ * @param orderStatusExchange 延迟交换器
+ */
+ @Bean
+ public Binding orderStatusBinding(Queue orderStatusQueue, CustomExchange orderStatusExchange) {
+ return BindingBuilder.bind(orderStatusQueue).to(orderStatusExchange).with(RabbitConstant.ORDER_STATUS_ROUTING).noargs();
+ }
+
+
+
+
+
+
+
+}