From d966b6da4dce02ac51babe69beebc7025eef4ac9 Mon Sep 17 00:00:00 2001 From: zhenghaoyu Date: Tue, 20 Jun 2023 17:49:42 +0800 Subject: [PATCH] =?UTF-8?q?1.mq=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=E9=80=BB=E8=BE=91=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/RabbitConstant.java | 5 ++++ .../factory/config/RabbitMqConfiguration.java | 19 ++++++++++++ .../factory/receiver/ErrorQueueHandler.java | 4 +-- ...eOneHandler.java => OpenOrderHandler.java} | 29 +++++-------------- .../controller/PanFactoryDataController.java | 2 ++ .../src/main/resources/application.yml | 3 +- 6 files changed, 37 insertions(+), 25 deletions(-) rename blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/{DirectQueueOneHandler.java => OpenOrderHandler.java} (67%) diff --git a/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java b/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java index 7c50d6039..2fc1f0f38 100644 --- a/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java +++ b/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java @@ -23,4 +23,9 @@ public interface RabbitConstant { String OPEN_ORDER_ROUTING = "open_order_routing"; + String DEAL_WITH_QUEUE = "deal_with_queue"; + String DEAL_WITH_EXCHANGE = "deal_with_exchange"; + String DEAL_WITH_ROUTING = "deal_with_routing"; + + } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java index 4b4ef3eb8..ee86d4d42 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java @@ -139,4 +139,23 @@ public class RabbitMqConfiguration { return BindingBuilder.bind(openOrderQueue).to(openOrderExchange).with(RabbitConstant.OPEN_ORDER_ROUTING).noargs(); } + + @Bean + public Queue dealWithQueue() { + return new Queue(RabbitConstant.DEAL_WITH_QUEUE, true); + } + + @Bean + public CustomExchange dealWithExchange() { + Map args = Maps.newHashMap(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitConstant.DEAL_WITH_EXCHANGE, "x-delayed-message", true, false, args); + } + + @Bean + public Binding dealWithBinding(Queue dealWithQueue, CustomExchange dealWithExchange) { + return BindingBuilder.bind(dealWithQueue).to(dealWithExchange).with(RabbitConstant.DEAL_WITH_ROUTING).noargs(); + } + + } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java index 8ad34241e..208643ffc 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java @@ -17,9 +17,9 @@ import java.util.Map; /** - * 直接队列1 处理器 + * 异常消息队列 处理器 * - * @author yangkai.shen + * @author zhy */ @Slf4j @RabbitListener(queues = RabbitConstant.ERROR_QUEUE) diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java similarity index 67% rename from blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java rename to blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java index ae470d29c..bb1787d7e 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java @@ -25,43 +25,28 @@ import java.util.Map; @Slf4j @RabbitListener(queues = RabbitConstant.OPEN_ORDER_QUEUE) @Component -public class DirectQueueOneHandler { +public class OpenOrderHandler { @Autowired private IPanFactoryDataService panFactoryDataService; + private Integer retryCount = 1; + @RabbitHandler - public void orderStatusHandlerManualAck(Map map, Message message, Channel channel) throws IOException { + public void openOrderStatusHandler(Map map, Message message, Channel channel) throws IOException { // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 final long deliveryTag = message.getMessageProperties().getDeliveryTag(); - log.info("##################我进入这里了"); - - int i = 1/0; + String msg = new String(message.getBody()); + log.info("##################我进入这里了 retryCount={}",retryCount++); OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData"); String orderNo = orderInfoDTO.getOrderNo(); R r = panFactoryDataService.handleData(orderInfoDTO); int code = r.getCode(); if(code == 400 || code == 200){ - log.info("##################orderStatusHandlerManualAck: 该条数据不用处理 orderNo={}",orderNo); + log.info("##################openOrderStatusHandler: 该条数据不用处理 orderNo={}",orderNo); channel.basicAck(deliveryTag,true); }else{ throw new CustomerException(code,r.getMsg()); } - -// try { -// log.info("直接队列1,手动ACK,接收消息:{}", map); -// OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData"); -// -// -// -// channel.basicAck(deliveryTag, false); -// } catch (IOException e) { -// try { -// // 处理失败,重新压入MQ -// channel.basicReject(deliveryTag,true); -// } catch (IOException e1) { -// e1.printStackTrace(); -// } -// } } } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java index 82773a71e..85ac72f3f 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java @@ -206,6 +206,8 @@ public class PanFactoryDataController extends BladeController { map.put("createTime",new Date().getTime()); //将消息携带绑定键值 rabbitTemplate.convertAndSend(RabbitConstant.ORDER_STATUS_EXCHANGE, RabbitConstant.ORDER_STATUS_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId()))); + rabbitTemplate.convertAndSend(RabbitConstant.DEAL_WITH_EXCHANGE, RabbitConstant.DEAL_WITH_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId()))); + // R r = factoryDataService.handleStatusData(orderStatusDTO); return R.success("调用成功"); diff --git a/blade-service/logpm-factory/src/main/resources/application.yml b/blade-service/logpm-factory/src/main/resources/application.yml index 3eb7313f3..cf42bc689 100644 --- a/blade-service/logpm-factory/src/main/resources/application.yml +++ b/blade-service/logpm-factory/src/main/resources/application.yml @@ -50,7 +50,8 @@ spring: # 手动提交消息 listener: simple: - acknowledge-mode: manual + acknowledge-mode: auto + default-requeue-rejected: false retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒