pref_mail@163.com
1 year ago
2 changed files with 131 additions and 0 deletions
@ -0,0 +1,125 @@
|
||||
package com.logpm.oldproject.config; |
||||
|
||||
import com.alibaba.nacos.shaded.com.google.common.collect.Maps; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.common.constant.RabbitConstant; |
||||
import org.springframework.amqp.core.*; |
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
||||
import org.springframework.amqp.rabbit.connection.CorrelationData; |
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||
import org.springframework.amqp.rabbit.retry.MessageRecoverer; |
||||
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 |
||||
* |
||||
* @author yangkai.shen |
||||
*/ |
||||
@Slf4j |
||||
@Configuration |
||||
public class RabbitMqConfiguration { |
||||
|
||||
@Bean |
||||
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ |
||||
RabbitTemplate template = new RabbitTemplate(); |
||||
template.setConnectionFactory(connectionFactory); |
||||
template.setMandatory(true); |
||||
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { |
||||
@Override |
||||
public void confirm(CorrelationData correlationData, boolean b, String s) { |
||||
System.out.println("确认回调-相关数据:"+correlationData); |
||||
System.out.println("确认回调-确认情况:"+b); |
||||
System.out.println("确认回调-原因:"+s); |
||||
// Long id = Long.parseLong(correlationData.getId());
|
||||
// UpdateWrapper<PanOrderStatusLog> updateWrapper = new UpdateWrapper<>();
|
||||
// if(b){
|
||||
// //修改处理日志为已处理
|
||||
// updateWrapper.set("status",0)
|
||||
// .eq("id",id);
|
||||
//
|
||||
// }else{
|
||||
// //修改处理日志为已处理
|
||||
// updateWrapper.set("status",2)
|
||||
// .eq("id",id);
|
||||
// }
|
||||
// panOrderStatusLogService.update(updateWrapper);
|
||||
} |
||||
}); |
||||
|
||||
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { |
||||
@Override |
||||
public void returnedMessage(ReturnedMessage returnedMessage) { |
||||
System.out.println("返回回调-消息:"+returnedMessage.getMessage()); |
||||
System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode()); |
||||
System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText()); |
||||
System.out.println("返回回调-交换机:"+returnedMessage.getExchange()); |
||||
System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey()); |
||||
} |
||||
}); |
||||
return template; |
||||
} |
||||
|
||||
@Bean |
||||
public DirectExchange errorMessageExchange(){ |
||||
return new DirectExchange(RabbitConstant.ERROR_EXCHANGE); |
||||
} |
||||
@Bean |
||||
public Queue errorQueue(){ |
||||
return new Queue(RabbitConstant.ERROR_QUEUE, true); |
||||
} |
||||
@Bean |
||||
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ |
||||
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RabbitConstant.ERROR_ROUTING); |
||||
} |
||||
|
||||
/** |
||||
* 消费失败队列 |
||||
* @param rabbitTemplate |
||||
* @return |
||||
*/ |
||||
@Bean |
||||
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ |
||||
return new RepublishMessageRecoverer(rabbitTemplate, RabbitConstant.ERROR_EXCHANGE, RabbitConstant.ERROR_ROUTING); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* 延迟队列 |
||||
*/ |
||||
@Bean |
||||
public Queue orderStatusQueue() { |
||||
return new Queue(RabbitConstant.ORDER_STATUS_QUEUE, true); |
||||
} |
||||
|
||||
/** |
||||
* 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定 |
||||
*/ |
||||
@Bean |
||||
public CustomExchange orderStatusExchange() { |
||||
Map<String, Object> args = Maps.newHashMap(); |
||||
args.put("x-delayed-type", "direct"); |
||||
return new CustomExchange(RabbitConstant.ORDER_STATUS_EXCHANGE, "x-delayed-message", true, false, args); |
||||
} |
||||
|
||||
/** |
||||
* 延迟队列绑定自定义交换器 |
||||
* |
||||
* @param orderStatusQueue 队列 |
||||
* @param orderStatusExchange 延迟交换器 |
||||
*/ |
||||
@Bean |
||||
public Binding orderStatusBinding(Queue orderStatusQueue, CustomExchange orderStatusExchange) { |
||||
return BindingBuilder.bind(orderStatusQueue).to(orderStatusExchange).with(RabbitConstant.ORDER_STATUS_ROUTING).noargs(); |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
} |
Loading…
Reference in new issue