diff --git a/blade-service/logpm-old-project/pom.xml b/blade-service/logpm-old-project/pom.xml index 85610f7c6..4d509a622 100644 --- a/blade-service/logpm-old-project/pom.xml +++ b/blade-service/logpm-old-project/pom.xml @@ -23,6 +23,12 @@ org.springblade blade-starter-swagger + + + org.springframework.boot + spring-boot-starter-amqp + + org.springblade diff --git a/blade-service/logpm-old-project/src/main/java/com/logpm/oldproject/config/RabbitMqConfiguration.java b/blade-service/logpm-old-project/src/main/java/com/logpm/oldproject/config/RabbitMqConfiguration.java new file mode 100644 index 000000000..c736abbd6 --- /dev/null +++ b/blade-service/logpm-old-project/src/main/java/com/logpm/oldproject/config/RabbitMqConfiguration.java @@ -0,0 +1,125 @@ +package com.logpm.oldproject.config; + +import com.alibaba.nacos.shaded.com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.springblade.common.constant.RabbitConstant; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + +/** + * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 + * + * @author yangkai.shen + */ +@Slf4j +@Configuration +public class RabbitMqConfiguration { + + @Bean + public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ + RabbitTemplate template = new RabbitTemplate(); + template.setConnectionFactory(connectionFactory); + template.setMandatory(true); + template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { + @Override + public void confirm(CorrelationData correlationData, boolean b, String s) { + System.out.println("确认回调-相关数据:"+correlationData); + System.out.println("确认回调-确认情况:"+b); + System.out.println("确认回调-原因:"+s); +// Long id = Long.parseLong(correlationData.getId()); +// UpdateWrapper updateWrapper = new UpdateWrapper<>(); +// if(b){ +// //修改处理日志为已处理 +// updateWrapper.set("status",0) +// .eq("id",id); +// +// }else{ +// //修改处理日志为已处理 +// updateWrapper.set("status",2) +// .eq("id",id); +// } +// panOrderStatusLogService.update(updateWrapper); + } + }); + + template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + System.out.println("返回回调-消息:"+returnedMessage.getMessage()); + System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode()); + System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText()); + System.out.println("返回回调-交换机:"+returnedMessage.getExchange()); + System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey()); + } + }); + return template; + } + + @Bean + public DirectExchange errorMessageExchange(){ + return new DirectExchange(RabbitConstant.ERROR_EXCHANGE); + } + @Bean + public Queue errorQueue(){ + return new Queue(RabbitConstant.ERROR_QUEUE, true); + } + @Bean + public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ + return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RabbitConstant.ERROR_ROUTING); + } + + /** + * 消费失败队列 + * @param rabbitTemplate + * @return + */ + @Bean + public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ + return new RepublishMessageRecoverer(rabbitTemplate, RabbitConstant.ERROR_EXCHANGE, RabbitConstant.ERROR_ROUTING); + } + + + /** + * 延迟队列 + */ + @Bean + public Queue orderStatusQueue() { + return new Queue(RabbitConstant.ORDER_STATUS_QUEUE, true); + } + + /** + * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定 + */ + @Bean + public CustomExchange orderStatusExchange() { + Map args = Maps.newHashMap(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitConstant.ORDER_STATUS_EXCHANGE, "x-delayed-message", true, false, args); + } + + /** + * 延迟队列绑定自定义交换器 + * + * @param orderStatusQueue 队列 + * @param orderStatusExchange 延迟交换器 + */ + @Bean + public Binding orderStatusBinding(Queue orderStatusQueue, CustomExchange orderStatusExchange) { + return BindingBuilder.bind(orderStatusQueue).to(orderStatusExchange).with(RabbitConstant.ORDER_STATUS_ROUTING).noargs(); + } + + + + + + + +}