From c99c25c1227d836a6d6bde825aa1fb3899838616 Mon Sep 17 00:00:00 2001 From: zhenghaoyu Date: Sat, 17 Jun 2023 14:48:36 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=8F=90=E4=BA=A4mq=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/RabbitConstant.java | 50 +---------- .../springblade/common/utils/CommonUtil.java | 5 ++ blade-service/logpm-factory/pom.xml | 8 +- .../factory/config/RabbitMqConfiguration.java | 87 +++++++++++++------ .../receiver/DirectQueueOneHandler.java | 61 ++++++++----- .../controller/PanFactoryDataController.java | 27 +++++- .../logpm/factory/snm/dto/OrderInfoDTO.java | 3 +- .../impl/PanFactoryDataServiceImpl.java | 7 +- .../src/main/resources/application.yml | 25 ++++-- 9 files changed, 158 insertions(+), 115 deletions(-) diff --git a/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java b/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java index 6d22799d6..a11b5bd4b 100644 --- a/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java +++ b/blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java @@ -6,53 +6,11 @@ package org.springblade.common.constant; * @author yangkai.shen */ public interface RabbitConstant { - /** - * 直接模式1 - */ - String DIRECT_MODE_QUEUE_ONE = "queue.direct.1"; - /** - * 队列2 - */ - String QUEUE_TWO = "queue.2"; + //订单状态消息队列配置 + String ORDER_STATUS_QUEUE = "order_status_queue"; + String ORDER_STATUS_EXCHANGE = "order_status_exchange"; + String ORDER_STATUS_ROUTING = "order_status_routing"; - /** - * 队列3 - */ - String QUEUE_THREE = "3.queue"; - /** - * 分列模式 - */ - String FANOUT_MODE_QUEUE = "fanout.mode"; - - /** - * 主题模式 - */ - String TOPIC_MODE_QUEUE = "topic.mode"; - - /** - * 路由1 - */ - String TOPIC_ROUTING_KEY_ONE = "queue.#"; - - /** - * 路由2 - */ - String TOPIC_ROUTING_KEY_TWO = "*.queue"; - - /** - * 路由3 - */ - String TOPIC_ROUTING_KEY_THREE = "3.queue"; - - /** - * 延迟队列 - */ - String DELAY_QUEUE = "delay.queue"; - - /** - * 延迟队列交换器 - */ - String DELAY_MODE_QUEUE = "delay.mode"; } diff --git a/blade-biz-common/src/main/java/org/springblade/common/utils/CommonUtil.java b/blade-biz-common/src/main/java/org/springblade/common/utils/CommonUtil.java index 3bd699c93..9432d8745 100644 --- a/blade-biz-common/src/main/java/org/springblade/common/utils/CommonUtil.java +++ b/blade-biz-common/src/main/java/org/springblade/common/utils/CommonUtil.java @@ -25,6 +25,7 @@ import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.Random; import java.util.TimeZone; +import java.util.UUID; /** * 通用工具类 @@ -91,4 +92,8 @@ public class CommonUtil { } + public static String getUUID(){ + return UUID.randomUUID().toString().replace("-", ""); + } + } diff --git a/blade-service/logpm-factory/pom.xml b/blade-service/logpm-factory/pom.xml index 5677d0d6f..9e830d9f2 100644 --- a/blade-service/logpm-factory/pom.xml +++ b/blade-service/logpm-factory/pom.xml @@ -76,10 +76,10 @@ xxl-job-core - - - - + + org.springframework.boot + spring-boot-starter-amqp + diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java index 41615f918..3a0b5bc9f 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java @@ -1,43 +1,70 @@ package com.logpm.factory.config; +import com.alibaba.nacos.shaded.com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.springblade.common.constant.RabbitConstant; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + /** * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 * * @author yangkai.shen */ -//@Slf4j -//@Configuration(proxyBeanMethods = false) +@Slf4j +@Configuration public class RabbitMqConfiguration { -// @Bean -// public RabbitTemplate rabbitTemplate() { -// CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); -// connectionFactory.setPublisherConfirms(true); -// connectionFactory.setPublisherReturns(true); -// RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); -// rabbitTemplate.setMandatory(true); -// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); -// rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message)); -// return rabbitTemplate; -// } + @Bean + public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ + RabbitTemplate template = new RabbitTemplate(); + template.setConnectionFactory(connectionFactory); + template.setMandatory(true); + template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { + @Override + public void confirm(CorrelationData correlationData, boolean b, String s) { + System.out.println("确认回调-相关数据:"+correlationData); + System.out.println("确认回调-确认情况:"+b); + System.out.println("确认回调-原因:"+s); + } + }); + + template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + System.out.println("返回回调-消息:"+returnedMessage.getMessage()); + System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode()); + System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText()); + System.out.println("返回回调-交换机:"+returnedMessage.getExchange()); + System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey()); + } + }); + return template; + } /** * 延迟队列 */ -// @Bean -// public Queue delayQueue() { -// return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_ONE, true); -// } + @Bean + public Queue delayQueue() { + return new Queue(RabbitConstant.ORDER_STATUS_QUEUE, true); + } /** * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定 */ -// @Bean -// public CustomExchange delayExchange() { -// Map args = Maps.newHashMap(); -// args.put("x-delayed-type", "direct"); -// return new CustomExchange("TestDirectExchange", "x-delayed-message", true, false, args); -// } + @Bean + public CustomExchange delayExchange() { + Map args = Maps.newHashMap(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitConstant.ORDER_STATUS_EXCHANGE, "x-delayed-message", true, false, args); + } /** * 延迟队列绑定自定义交换器 @@ -45,9 +72,15 @@ public class RabbitMqConfiguration { * @param delayQueue 队列 * @param delayExchange 延迟交换器 */ -// @Bean -// public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { -// return BindingBuilder.bind(delayQueue).to(delayExchange).with("TestDirectRouting").noargs(); -// } + @Bean + public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { + return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConstant.ORDER_STATUS_ROUTING).noargs(); + } + + + @Bean + DirectExchange lonelyDirectExchange() { + return new DirectExchange("lonelyDirectExchange"); + } } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java index 9c644acab..a4d353a97 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java @@ -1,30 +1,51 @@ package com.logpm.factory.receiver; +import com.logpm.factory.snm.dto.OrderInfoDTO; +import com.logpm.factory.snm.service.IPanFactoryDataService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springblade.common.constant.RabbitConstant; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Map; + + /** * 直接队列1 处理器 * * @author yangkai.shen */ -//@Slf4j -//@RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_ONE) -//@Component +@Slf4j +@RabbitListener(queues = RabbitConstant.ORDER_STATUS_QUEUE) +@Component public class DirectQueueOneHandler { -// @RabbitHandler -// public void directHandlerManualAck(PanFactoryOrderDTO factoryOrderDTO, Message message, Channel channel) { -// // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 -// final long deliveryTag = message.getMessageProperties().getDeliveryTag(); -// try { -// log.info("直接队列1,手动ACK,接收消息:{}", factoryOrderDTO); -// //通知 MQ 消息已被成功消费,可以ACK了 -// channel.basicAck(deliveryTag, false); -// } catch (IOException e) { -// try { -// // 处理失败,重新压入MQ -// channel.basicRecover(); -// } catch (IOException e1) { -// e1.printStackTrace(); -// } -// } -// } + @Autowired + private IPanFactoryDataService panFactoryDataService; + + @RabbitHandler + public void directHandlerManualAck(Map map, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("直接队列1,手动ACK,接收消息:{}", map); + OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData"); + + panFactoryDataService.handleData(orderInfoDTO); + + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java index bb83523fd..7aa4b7f23 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java @@ -30,15 +30,17 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.springblade.common.constant.RabbitConstant; import org.springblade.common.exception.CustomerException; +import org.springblade.common.utils.CommonUtil; import org.springblade.core.boot.ctrl.BladeController; import org.springblade.core.tool.api.R; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; -import java.util.ArrayList; -import java.util.List; +import java.util.*; /** * 皮阿诺数据 控制器 @@ -59,6 +61,7 @@ public class PanFactoryDataController extends BladeController { private final IPanFactoryDataService factoryDataService; private final IAsyncDataService syncDataService; private final IPanOrderStatusLogService panOrderStatusLogService; + private RabbitTemplate rabbitTemplate; // @ResponseBody // @PostMapping("/token") @@ -152,8 +155,24 @@ public class PanFactoryDataController extends BladeController { public R sendOrders(@Validated @RequestBody OrderInfoDTO orderInfoDTO) { log.info("############sendOrders: 请求参数{}",orderInfoDTO); try{ - R r = factoryDataService.handleData(orderInfoDTO); - return r; + + //先保存原始请求数据 + PanOrderStatusLog panOrderStatusLog = new PanOrderStatusLog(); + panOrderStatusLog.setArgs(JSONObject.toJSONString(orderInfoDTO)); + panOrderStatusLog.setStatus(1); + panOrderStatusLog.setType(1); + panOrderStatusLogService.save(panOrderStatusLog); + + Map map=new HashMap<>(); + map.put("messageId", CommonUtil.getUUID()); + map.put("messageData",orderInfoDTO); + map.put("createTime",new Date().getTime()); + //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange + rabbitTemplate.convertAndSend(RabbitConstant.ORDER_STATUS_EXCHANGE, RabbitConstant.ORDER_STATUS_ROUTING, map); + + +// R r = factoryDataService.handleData(orderInfoDTO); + return R.success("调用成功"); }catch (CustomerException e){ log.error(e.message,e); return R.fail(e.code,e.message); diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderInfoDTO.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderInfoDTO.java index f3b519335..365f29449 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderInfoDTO.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderInfoDTO.java @@ -20,6 +20,7 @@ import lombok.Data; import org.springblade.core.tool.utils.StringUtil; import javax.validation.constraints.NotEmpty; +import java.io.Serializable; /** * PHP传过来的OrderInfo @@ -28,7 +29,7 @@ import javax.validation.constraints.NotEmpty; * @since 2023-06-12 */ @Data -public class OrderInfoDTO { +public class OrderInfoDTO implements Serializable { private static final long serialVersionUID = 1L; @NotEmpty(message = "客户订单号不能为空") diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java index c7afdf565..bf87967a0 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java @@ -95,12 +95,7 @@ public class PanFactoryDataServiceImpl implements IPanFactoryDataService { @Override public R handleData(OrderInfoDTO orderInfoDTO) throws CustomerException { - //先保存原始请求数据 - PanOrderStatusLog panOrderStatusLog = new PanOrderStatusLog(); - panOrderStatusLog.setArgs(JSONObject.toJSONString(orderInfoDTO)); - panOrderStatusLog.setStatus(1); - panOrderStatusLog.setType(1); - panOrderStatusLogService.save(panOrderStatusLog); + String orderNo = orderInfoDTO.getOrderNo(); diff --git a/blade-service/logpm-factory/src/main/resources/application.yml b/blade-service/logpm-factory/src/main/resources/application.yml index ed6606aa2..d9fdd481a 100644 --- a/blade-service/logpm-factory/src/main/resources/application.yml +++ b/blade-service/logpm-factory/src/main/resources/application.yml @@ -35,14 +35,25 @@ logging: spring: main: allow-circular-references: true + #rabbitmq配置 -# rabbitmq: -# host: 192.168.2.100 -# port: 5672 -# username: admin -# password: admin -# #虚拟host 可以不设置,使用server默认host -# virtual-host: / + rabbitmq: + host: 192.168.2.100 + port: 5672 + username: admin + password: admin + #虚拟host 可以不设置,使用server默认host + virtual-host: / + #确认消息已发送到队列(Queue) + publisher-returns: true + publisher-confirm-type: correlated + # 手动提交消息 + listener: + simple: + acknowledge-mode: manual + direct: + acknowledge-mode: manual + xxl: job: accessToken: ''