diff --git a/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java b/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java index 7c50d6039..2fc1f0f38 100644 --- a/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java +++ b/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java @@ -23,4 +23,9 @@ public interface RabbitConstant { String OPEN_ORDER_ROUTING = "open_order_routing"; + String DEAL_WITH_QUEUE = "deal_with_queue"; + String DEAL_WITH_EXCHANGE = "deal_with_exchange"; + String DEAL_WITH_ROUTING = "deal_with_routing"; + + } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java index 4b4ef3eb8..ee86d4d42 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java @@ -139,4 +139,23 @@ public class RabbitMqConfiguration { return BindingBuilder.bind(openOrderQueue).to(openOrderExchange).with(RabbitConstant.OPEN_ORDER_ROUTING).noargs(); } + + @Bean + public Queue dealWithQueue() { + return new Queue(RabbitConstant.DEAL_WITH_QUEUE, true); + } + + @Bean + public CustomExchange dealWithExchange() { + Map args = Maps.newHashMap(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitConstant.DEAL_WITH_EXCHANGE, "x-delayed-message", true, false, args); + } + + @Bean + public Binding dealWithBinding(Queue dealWithQueue, CustomExchange dealWithExchange) { + return BindingBuilder.bind(dealWithQueue).to(dealWithExchange).with(RabbitConstant.DEAL_WITH_ROUTING).noargs(); + } + + } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java index 8ad34241e..208643ffc 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java @@ -17,9 +17,9 @@ import java.util.Map; /** - * 直接队列1 处理器 + * 异常消息队列 处理器 * - * @author yangkai.shen + * @author zhy */ @Slf4j @RabbitListener(queues = RabbitConstant.ERROR_QUEUE) diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java similarity index 67% rename from blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java rename to blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java index ae470d29c..bb1787d7e 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java @@ -25,43 +25,28 @@ import java.util.Map; @Slf4j @RabbitListener(queues = RabbitConstant.OPEN_ORDER_QUEUE) @Component -public class DirectQueueOneHandler { +public class OpenOrderHandler { @Autowired private IPanFactoryDataService panFactoryDataService; + private Integer retryCount = 1; + @RabbitHandler - public void orderStatusHandlerManualAck(Map map, Message message, Channel channel) throws IOException { + public void openOrderStatusHandler(Map map, Message message, Channel channel) throws IOException { // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 final long deliveryTag = message.getMessageProperties().getDeliveryTag(); - log.info("##################我进入这里了"); - - int i = 1/0; + String msg = new String(message.getBody()); + log.info("##################我进入这里了 retryCount={}",retryCount++); OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData"); String orderNo = orderInfoDTO.getOrderNo(); R r = panFactoryDataService.handleData(orderInfoDTO); int code = r.getCode(); if(code == 400 || code == 200){ - log.info("##################orderStatusHandlerManualAck: 该条数据不用处理 orderNo={}",orderNo); + log.info("##################openOrderStatusHandler: 该条数据不用处理 orderNo={}",orderNo); channel.basicAck(deliveryTag,true); }else{ throw new CustomerException(code,r.getMsg()); } - -// try { -// log.info("直接队列1,手动ACK,接收消息:{}", map); -// OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData"); -// -// -// -// channel.basicAck(deliveryTag, false); -// } catch (IOException e) { -// try { -// // 处理失败,重新压入MQ -// channel.basicReject(deliveryTag,true); -// } catch (IOException e1) { -// e1.printStackTrace(); -// } -// } } } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java index 82773a71e..85ac72f3f 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java @@ -206,6 +206,8 @@ public class PanFactoryDataController extends BladeController { map.put("createTime",new Date().getTime()); //将消息携带绑定键值 rabbitTemplate.convertAndSend(RabbitConstant.ORDER_STATUS_EXCHANGE, RabbitConstant.ORDER_STATUS_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId()))); + rabbitTemplate.convertAndSend(RabbitConstant.DEAL_WITH_EXCHANGE, RabbitConstant.DEAL_WITH_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId()))); + // R r = factoryDataService.handleStatusData(orderStatusDTO); return R.success("调用成功"); diff --git a/blade-service/logpm-factory/src/main/resources/application.yml b/blade-service/logpm-factory/src/main/resources/application.yml index 3eb7313f3..cf42bc689 100644 --- a/blade-service/logpm-factory/src/main/resources/application.yml +++ b/blade-service/logpm-factory/src/main/resources/application.yml @@ -50,7 +50,8 @@ spring: # 手动提交消息 listener: simple: - acknowledge-mode: manual + acknowledge-mode: auto + default-requeue-rejected: false retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒