Browse Source

1.提交mq相关的代码

dev-warehouse
zhenghaoyu 2 years ago
parent
commit
c99c25c122
  1. 50
      blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java
  2. 5
      blade-biz-common/src/main/java/org/springblade/common/utils/CommonUtil.java
  3. 8
      blade-service/logpm-factory/pom.xml
  4. 87
      blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java
  5. 61
      blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java
  6. 27
      blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java
  7. 3
      blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderInfoDTO.java
  8. 7
      blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java
  9. 25
      blade-service/logpm-factory/src/main/resources/application.yml

50
blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java

@ -6,53 +6,11 @@ package org.springblade.common.constant;
* @author yangkai.shen * @author yangkai.shen
*/ */
public interface RabbitConstant { public interface RabbitConstant {
/**
* 直接模式1
*/
String DIRECT_MODE_QUEUE_ONE = "queue.direct.1";
/** //订单状态消息队列配置
* 队列2 String ORDER_STATUS_QUEUE = "order_status_queue";
*/ String ORDER_STATUS_EXCHANGE = "order_status_exchange";
String QUEUE_TWO = "queue.2"; String ORDER_STATUS_ROUTING = "order_status_routing";
/**
* 队列3
*/
String QUEUE_THREE = "3.queue";
/**
* 分列模式
*/
String FANOUT_MODE_QUEUE = "fanout.mode";
/**
* 主题模式
*/
String TOPIC_MODE_QUEUE = "topic.mode";
/**
* 路由1
*/
String TOPIC_ROUTING_KEY_ONE = "queue.#";
/**
* 路由2
*/
String TOPIC_ROUTING_KEY_TWO = "*.queue";
/**
* 路由3
*/
String TOPIC_ROUTING_KEY_THREE = "3.queue";
/**
* 延迟队列
*/
String DELAY_QUEUE = "delay.queue";
/**
* 延迟队列交换器
*/
String DELAY_MODE_QUEUE = "delay.mode";
} }

5
blade-biz-common/src/main/java/org/springblade/common/utils/CommonUtil.java

@ -25,6 +25,7 @@ import java.time.format.DateTimeFormatter;
import java.util.Date; import java.util.Date;
import java.util.Random; import java.util.Random;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.UUID;
/** /**
* 通用工具类 * 通用工具类
@ -91,4 +92,8 @@ public class CommonUtil {
} }
public static String getUUID(){
return UUID.randomUUID().toString().replace("-", "");
}
} }

8
blade-service/logpm-factory/pom.xml

@ -76,10 +76,10 @@
<artifactId>xxl-job-core</artifactId> <artifactId>xxl-job-core</artifactId>
</dependency> </dependency>
<!--mq--> <!--mq-->
<!-- <dependency>--> <dependency>
<!-- <groupId>org.springframework.boot</groupId>--> <groupId>org.springframework.boot</groupId>
<!-- <artifactId>spring-boot-starter-amqp</artifactId>--> <artifactId>spring-boot-starter-amqp</artifactId>
<!-- </dependency>--> </dependency>
</dependencies> </dependencies>
<build> <build>

87
blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java

@ -1,43 +1,70 @@
package com.logpm.factory.config; package com.logpm.factory.config;
import com.alibaba.nacos.shaded.com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/** /**
* RabbitMQ配置主要是配置队列如果提前存在该队列可以省略本配置类 * RabbitMQ配置主要是配置队列如果提前存在该队列可以省略本配置类
* *
* @author yangkai.shen * @author yangkai.shen
*/ */
//@Slf4j @Slf4j
//@Configuration(proxyBeanMethods = false) @Configuration
public class RabbitMqConfiguration { public class RabbitMqConfiguration {
// @Bean @Bean
// public RabbitTemplate rabbitTemplate() { public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
// CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); RabbitTemplate template = new RabbitTemplate();
// connectionFactory.setPublisherConfirms(true); template.setConnectionFactory(connectionFactory);
// connectionFactory.setPublisherReturns(true); template.setMandatory(true);
// RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
// rabbitTemplate.setMandatory(true); @Override
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); public void confirm(CorrelationData correlationData, boolean b, String s) {
// rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message)); System.out.println("确认回调-相关数据:"+correlationData);
// return rabbitTemplate; System.out.println("确认回调-确认情况:"+b);
// } System.out.println("确认回调-原因:"+s);
}
});
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("返回回调-消息:"+returnedMessage.getMessage());
System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode());
System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText());
System.out.println("返回回调-交换机:"+returnedMessage.getExchange());
System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey());
}
});
return template;
}
/** /**
* 延迟队列 * 延迟队列
*/ */
// @Bean @Bean
// public Queue delayQueue() { public Queue delayQueue() {
// return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_ONE, true); return new Queue(RabbitConstant.ORDER_STATUS_QUEUE, true);
// } }
/** /**
* 延迟队列交换器, x-delayed-type x-delayed-message 固定 * 延迟队列交换器, x-delayed-type x-delayed-message 固定
*/ */
// @Bean @Bean
// public CustomExchange delayExchange() { public CustomExchange delayExchange() {
// Map<String, Object> args = Maps.newHashMap(); Map<String, Object> args = Maps.newHashMap();
// args.put("x-delayed-type", "direct"); args.put("x-delayed-type", "direct");
// return new CustomExchange("TestDirectExchange", "x-delayed-message", true, false, args); return new CustomExchange(RabbitConstant.ORDER_STATUS_EXCHANGE, "x-delayed-message", true, false, args);
// } }
/** /**
* 延迟队列绑定自定义交换器 * 延迟队列绑定自定义交换器
@ -45,9 +72,15 @@ public class RabbitMqConfiguration {
* @param delayQueue 队列 * @param delayQueue 队列
* @param delayExchange 延迟交换器 * @param delayExchange 延迟交换器
*/ */
// @Bean @Bean
// public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
// return BindingBuilder.bind(delayQueue).to(delayExchange).with("TestDirectRouting").noargs(); return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConstant.ORDER_STATUS_ROUTING).noargs();
// } }
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
} }

61
blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java

@ -1,30 +1,51 @@
package com.logpm.factory.receiver; package com.logpm.factory.receiver;
import com.logpm.factory.snm.dto.OrderInfoDTO;
import com.logpm.factory.snm.service.IPanFactoryDataService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.RabbitConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/** /**
* 直接队列1 处理器 * 直接队列1 处理器
* *
* @author yangkai.shen * @author yangkai.shen
*/ */
//@Slf4j @Slf4j
//@RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_ONE) @RabbitListener(queues = RabbitConstant.ORDER_STATUS_QUEUE)
//@Component @Component
public class DirectQueueOneHandler { public class DirectQueueOneHandler {
// @RabbitHandler @Autowired
// public void directHandlerManualAck(PanFactoryOrderDTO factoryOrderDTO, Message message, Channel channel) { private IPanFactoryDataService panFactoryDataService;
// // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
// final long deliveryTag = message.getMessageProperties().getDeliveryTag(); @RabbitHandler
// try { public void directHandlerManualAck(Map map, Message message, Channel channel) {
// log.info("直接队列1,手动ACK,接收消息:{}", factoryOrderDTO); // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
// //通知 MQ 消息已被成功消费,可以ACK了 final long deliveryTag = message.getMessageProperties().getDeliveryTag();
// channel.basicAck(deliveryTag, false); try {
// } catch (IOException e) { log.info("直接队列1,手动ACK,接收消息:{}", map);
// try { OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData");
// // 处理失败,重新压入MQ
// channel.basicRecover(); panFactoryDataService.handleData(orderInfoDTO);
// } catch (IOException e1) {
// e1.printStackTrace(); channel.basicAck(deliveryTag, false);
// } } catch (IOException e) {
// } try {
// } // 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
} }

27
blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java

@ -30,15 +30,17 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springblade.common.constant.RabbitConstant;
import org.springblade.common.exception.CustomerException; import org.springblade.common.exception.CustomerException;
import org.springblade.common.utils.CommonUtil;
import org.springblade.core.boot.ctrl.BladeController; import org.springblade.core.boot.ctrl.BladeController;
import org.springblade.core.tool.api.R; import org.springblade.core.tool.api.R;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList; import java.util.*;
import java.util.List;
/** /**
* 皮阿诺数据 控制器 * 皮阿诺数据 控制器
@ -59,6 +61,7 @@ public class PanFactoryDataController extends BladeController {
private final IPanFactoryDataService factoryDataService; private final IPanFactoryDataService factoryDataService;
private final IAsyncDataService syncDataService; private final IAsyncDataService syncDataService;
private final IPanOrderStatusLogService panOrderStatusLogService; private final IPanOrderStatusLogService panOrderStatusLogService;
private RabbitTemplate rabbitTemplate;
// @ResponseBody // @ResponseBody
// @PostMapping("/token") // @PostMapping("/token")
@ -152,8 +155,24 @@ public class PanFactoryDataController extends BladeController {
public R sendOrders(@Validated @RequestBody OrderInfoDTO orderInfoDTO) { public R sendOrders(@Validated @RequestBody OrderInfoDTO orderInfoDTO) {
log.info("############sendOrders: 请求参数{}",orderInfoDTO); log.info("############sendOrders: 请求参数{}",orderInfoDTO);
try{ try{
R r = factoryDataService.handleData(orderInfoDTO);
return r; //先保存原始请求数据
PanOrderStatusLog panOrderStatusLog = new PanOrderStatusLog();
panOrderStatusLog.setArgs(JSONObject.toJSONString(orderInfoDTO));
panOrderStatusLog.setStatus(1);
panOrderStatusLog.setType(1);
panOrderStatusLogService.save(panOrderStatusLog);
Map<String,Object> map=new HashMap<>();
map.put("messageId", CommonUtil.getUUID());
map.put("messageData",orderInfoDTO);
map.put("createTime",new Date().getTime());
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend(RabbitConstant.ORDER_STATUS_EXCHANGE, RabbitConstant.ORDER_STATUS_ROUTING, map);
// R r = factoryDataService.handleData(orderInfoDTO);
return R.success("调用成功");
}catch (CustomerException e){ }catch (CustomerException e){
log.error(e.message,e); log.error(e.message,e);
return R.fail(e.code,e.message); return R.fail(e.code,e.message);

3
blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderInfoDTO.java

@ -20,6 +20,7 @@ import lombok.Data;
import org.springblade.core.tool.utils.StringUtil; import org.springblade.core.tool.utils.StringUtil;
import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotEmpty;
import java.io.Serializable;
/** /**
* PHP传过来的OrderInfo * PHP传过来的OrderInfo
@ -28,7 +29,7 @@ import javax.validation.constraints.NotEmpty;
* @since 2023-06-12 * @since 2023-06-12
*/ */
@Data @Data
public class OrderInfoDTO { public class OrderInfoDTO implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@NotEmpty(message = "客户订单号不能为空") @NotEmpty(message = "客户订单号不能为空")

7
blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java

@ -95,12 +95,7 @@ public class PanFactoryDataServiceImpl implements IPanFactoryDataService {
@Override @Override
public R handleData(OrderInfoDTO orderInfoDTO) throws CustomerException { public R handleData(OrderInfoDTO orderInfoDTO) throws CustomerException {
//先保存原始请求数据
PanOrderStatusLog panOrderStatusLog = new PanOrderStatusLog();
panOrderStatusLog.setArgs(JSONObject.toJSONString(orderInfoDTO));
panOrderStatusLog.setStatus(1);
panOrderStatusLog.setType(1);
panOrderStatusLogService.save(panOrderStatusLog);
String orderNo = orderInfoDTO.getOrderNo(); String orderNo = orderInfoDTO.getOrderNo();

25
blade-service/logpm-factory/src/main/resources/application.yml

@ -35,14 +35,25 @@ logging:
spring: spring:
main: main:
allow-circular-references: true allow-circular-references: true
#rabbitmq配置 #rabbitmq配置
# rabbitmq: rabbitmq:
# host: 192.168.2.100 host: 192.168.2.100
# port: 5672 port: 5672
# username: admin username: admin
# password: admin password: admin
# #虚拟host 可以不设置,使用server默认host #虚拟host 可以不设置,使用server默认host
# virtual-host: / virtual-host: /
#确认消息已发送到队列(Queue)
publisher-returns: true
publisher-confirm-type: correlated
# 手动提交消息
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
xxl: xxl:
job: job:
accessToken: '' accessToken: ''

Loading…
Cancel
Save