Browse Source

1.mq接收消息,消费者重试三次,三次失败消息写入错误队列,错误队列记录消息处理状态

dev-warehouse
zhenghaoyu 2 years ago
parent
commit
06a06d8ef1
  1. 10
      blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java
  2. 82
      blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java
  3. 48
      blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java
  4. 56
      blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java
  5. 26
      blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java
  6. 3
      blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderStatusDTO.java
  7. 30
      blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java
  8. 8
      blade-service/logpm-factory/src/main/resources/application.yml

10
blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java

@ -6,6 +6,11 @@ package org.springblade.common.constant;
* @author yangkai.shen
*/
public interface RabbitConstant {
//消费失败队列
String ERROR_QUEUE = "error_queue";
String ERROR_EXCHANGE = "error_exchange";
String ERROR_ROUTING = "error_routing";
//订单状态消息队列配置
String ORDER_STATUS_QUEUE = "order_status_queue";
@ -13,4 +18,9 @@ public interface RabbitConstant {
String ORDER_STATUS_ROUTING = "order_status_routing";
String OPEN_ORDER_QUEUE = "open_order_queue";
String OPEN_ORDER_EXCHANGE = "open_order_exchange";
String OPEN_ORDER_ROUTING = "open_order_routing";
}

82
blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java

@ -1,16 +1,14 @@
package com.logpm.factory.config;
import com.alibaba.nacos.shaded.com.google.common.collect.Maps;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.logpm.factory.snm.entity.PanOrderStatusLog;
import com.logpm.factory.snm.service.IPanOrderStatusLogService;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -25,9 +23,6 @@ import java.util.Map;
@Configuration
public class RabbitMqConfiguration {
@Autowired
private IPanOrderStatusLogService panOrderStatusLogService;
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate template = new RabbitTemplate();
@ -39,13 +34,19 @@ public class RabbitMqConfiguration {
System.out.println("确认回调-相关数据:"+correlationData);
System.out.println("确认回调-确认情况:"+b);
System.out.println("确认回调-原因:"+s);
//修改处理日志为已处理
Long id = Long.parseLong(correlationData.getId());
UpdateWrapper<PanOrderStatusLog> updateWrapper = new UpdateWrapper<>();
updateWrapper.set("status",0)
.eq("id",id);
panOrderStatusLogService.update(updateWrapper);
// Long id = Long.parseLong(correlationData.getId());
// UpdateWrapper<PanOrderStatusLog> updateWrapper = new UpdateWrapper<>();
// if(b){
// //修改处理日志为已处理
// updateWrapper.set("status",0)
// .eq("id",id);
//
// }else{
// //修改处理日志为已处理
// updateWrapper.set("status",2)
// .eq("id",id);
// }
// panOrderStatusLogService.update(updateWrapper);
}
});
@ -62,11 +63,35 @@ public class RabbitMqConfiguration {
return template;
}
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange(RabbitConstant.ERROR_EXCHANGE);
}
@Bean
public Queue errorQueue(){
return new Queue(RabbitConstant.ERROR_QUEUE, true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RabbitConstant.ERROR_ROUTING);
}
/**
* 消费失败队列
* @param rabbitTemplate
* @return
*/
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, RabbitConstant.ERROR_EXCHANGE, RabbitConstant.ERROR_ROUTING);
}
/**
* 延迟队列
*/
@Bean
public Queue delayQueue() {
public Queue orderStatusQueue() {
return new Queue(RabbitConstant.ORDER_STATUS_QUEUE, true);
}
@ -74,7 +99,7 @@ public class RabbitMqConfiguration {
* 延迟队列交换器, x-delayed-type x-delayed-message 固定
*/
@Bean
public CustomExchange delayExchange() {
public CustomExchange orderStatusExchange() {
Map<String, Object> args = Maps.newHashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitConstant.ORDER_STATUS_EXCHANGE, "x-delayed-message", true, false, args);
@ -83,12 +108,12 @@ public class RabbitMqConfiguration {
/**
* 延迟队列绑定自定义交换器
*
* @param delayQueue 队列
* @param delayExchange 延迟交换器
* @param orderStatusQueue 队列
* @param orderStatusExchange 延迟交换器
*/
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConstant.ORDER_STATUS_ROUTING).noargs();
public Binding orderStatusBinding(Queue orderStatusQueue, CustomExchange orderStatusExchange) {
return BindingBuilder.bind(orderStatusQueue).to(orderStatusExchange).with(RabbitConstant.ORDER_STATUS_ROUTING).noargs();
}
@ -97,4 +122,21 @@ public class RabbitMqConfiguration {
return new DirectExchange("lonelyDirectExchange");
}
@Bean
public Queue openOrderQueue() {
return new Queue(RabbitConstant.OPEN_ORDER_QUEUE, true);
}
@Bean
public CustomExchange openOrderExchange() {
Map<String, Object> args = Maps.newHashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitConstant.OPEN_ORDER_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Binding openOrderBinding(Queue openOrderQueue, CustomExchange openOrderExchange) {
return BindingBuilder.bind(openOrderQueue).to(openOrderExchange).with(RabbitConstant.OPEN_ORDER_ROUTING).noargs();
}
}

48
blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java

@ -5,6 +5,8 @@ import com.logpm.factory.snm.service.IPanFactoryDataService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.RabbitConstant;
import org.springblade.common.exception.CustomerException;
import org.springblade.core.tool.api.R;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@ -21,7 +23,7 @@ import java.util.Map;
* @author yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConstant.ORDER_STATUS_QUEUE)
@RabbitListener(queues = RabbitConstant.OPEN_ORDER_QUEUE)
@Component
public class DirectQueueOneHandler {
@ -29,23 +31,37 @@ public class DirectQueueOneHandler {
private IPanFactoryDataService panFactoryDataService;
@RabbitHandler
public void directHandlerManualAck(Map map, Message message, Channel channel) {
public void orderStatusHandlerManualAck(Map map, Message message, Channel channel) throws IOException {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("直接队列1,手动ACK,接收消息:{}", map);
OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData");
panFactoryDataService.handleData(orderInfoDTO);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicReject(deliveryTag,true);
} catch (IOException e1) {
e1.printStackTrace();
}
log.info("##################我进入这里了");
int i = 1/0;
OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData");
String orderNo = orderInfoDTO.getOrderNo();
R r = panFactoryDataService.handleData(orderInfoDTO);
int code = r.getCode();
if(code == 400 || code == 200){
log.info("##################orderStatusHandlerManualAck: 该条数据不用处理 orderNo={}",orderNo);
channel.basicAck(deliveryTag,true);
}else{
throw new CustomerException(code,r.getMsg());
}
// try {
// log.info("直接队列1,手动ACK,接收消息:{}", map);
// OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData");
//
//
//
// channel.basicAck(deliveryTag, false);
// } catch (IOException e) {
// try {
// // 处理失败,重新压入MQ
// channel.basicReject(deliveryTag,true);
// } catch (IOException e1) {
// e1.printStackTrace();
// }
// }
}
}

56
blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java

@ -0,0 +1,56 @@
package com.logpm.factory.receiver;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.logpm.factory.snm.entity.PanOrderStatusLog;
import com.logpm.factory.snm.service.IPanOrderStatusLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.RabbitConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 直接队列1 处理器
*
* @author yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConstant.ERROR_QUEUE)
@Component
public class ErrorQueueHandler {
@Autowired
private IPanOrderStatusLogService panOrderStatusLogService;
@RabbitHandler
public void orderStatusHandlerManualAck(Map map, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
Long logId = (Long) map.get("logId");
UpdateWrapper<PanOrderStatusLog> updateWrapper = new UpdateWrapper<>();
updateWrapper.set("status",2)
.eq("id",logId);
boolean updateFlag = panOrderStatusLogService.update(updateWrapper);
if(updateFlag){
channel.basicAck(deliveryTag, false);
}else{
channel.basicReject(deliveryTag,true);
}
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicReject(deliveryTag,true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

26
blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java

@ -166,10 +166,11 @@ public class PanFactoryDataController extends BladeController {
Map<String,Object> map=new HashMap<>();
map.put("messageId", CommonUtil.getUUID());
map.put("logId", panOrderStatusLog.getId());
map.put("messageData",orderInfoDTO);
map.put("createTime",new Date().getTime());
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("lonelyDirectExchange", RabbitConstant.ORDER_STATUS_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId())));
//将消息携带绑定键值
rabbitTemplate.convertAndSend(RabbitConstant.OPEN_ORDER_EXCHANGE, RabbitConstant.OPEN_ORDER_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId())));
// R r = factoryDataService.handleData(orderInfoDTO);
return R.success("调用成功");
@ -190,9 +191,24 @@ public class PanFactoryDataController extends BladeController {
public R sendOrderStatus(@RequestBody OrderStatusDTO orderStatusDTO) {
log.info("############sendOrderStatus: 请求参数{}",orderStatusDTO);
try{
log.info("新分支");
R r = factoryDataService.handleStatusData(orderStatusDTO);
return r;
//先保存原始请求数据
PanOrderStatusLog panOrderStatusLog = new PanOrderStatusLog();
panOrderStatusLog.setArgs(JSONObject.toJSONString(orderStatusDTO));
panOrderStatusLog.setStatus(1);
panOrderStatusLog.setType(2);
panOrderStatusLogService.save(panOrderStatusLog);
Map<String,Object> map=new HashMap<>();
map.put("messageId", CommonUtil.getUUID());
map.put("logId", panOrderStatusLog.getId());
map.put("messageData",orderStatusDTO);
map.put("createTime",new Date().getTime());
//将消息携带绑定键值
rabbitTemplate.convertAndSend(RabbitConstant.ORDER_STATUS_EXCHANGE, RabbitConstant.ORDER_STATUS_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId())));
// R r = factoryDataService.handleStatusData(orderStatusDTO);
return R.success("调用成功");
}catch (CustomerException e){
log.error(e.message,e);
return R.fail(e.code,e.message);

3
blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/dto/OrderStatusDTO.java

@ -20,6 +20,7 @@ import lombok.Data;
import org.springblade.core.tool.utils.StringUtil;
import javax.validation.constraints.NotEmpty;
import java.io.Serializable;
/**
* OrderStatus
@ -28,7 +29,7 @@ import javax.validation.constraints.NotEmpty;
* @since 2023-06-12
*/
@Data
public class OrderStatusDTO {
public class OrderStatusDTO implements Serializable {
private static final long serialVersionUID = 1L;
@NotEmpty(message = "派车单号不能为空")

30
blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/service/impl/PanFactoryDataServiceImpl.java

@ -10,7 +10,6 @@ import com.logpm.factory.snm.bean.Resp;
import com.logpm.factory.snm.dto.OrderInfoDTO;
import com.logpm.factory.snm.dto.OrderStatusDTO;
import com.logpm.factory.snm.entity.PanFactoryOrder;
import com.logpm.factory.snm.entity.PanOrderStatusLog;
import com.logpm.factory.snm.entity.PanPackageInfo;
import com.logpm.factory.snm.service.IPanFactoryDataService;
import com.logpm.factory.snm.service.IPanFactoryOrderService;
@ -25,6 +24,7 @@ import org.springblade.common.exception.CustomerException;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.Map;
@ -91,12 +91,10 @@ public class PanFactoryDataServiceImpl implements IPanFactoryDataService {
// return factoryTokenVO;
// }
@Transactional
@Override
public R handleData(OrderInfoDTO orderInfoDTO) throws CustomerException {
String orderNo = orderInfoDTO.getOrderNo();
//根据客户订单号去查询WMS装车清单号
@ -115,7 +113,7 @@ public class PanFactoryDataServiceImpl implements IPanFactoryDataService {
//判断数据是都已存在
if(!orderInfoDTO.verifyData()){
logger.info("#############handleData: 数据不齐全 orderInfoDTO={}",orderInfoDTO);
return R.fail(400,"数据不齐全");
return R.fail(405,"数据不齐全");
}
//拼接参数
@ -148,26 +146,20 @@ public class PanFactoryDataServiceImpl implements IPanFactoryDataService {
if(code.equals(1)){
logger.info("##########handleData: 物流状态传递成功");
}else{
return R.fail(400,message);
return R.fail(405,message);
}
}else{
return R.fail(400,"返回格式有误:"+result);
return R.fail(405,"返回格式有误:"+result);
}
return Resp.success("SUCCESS");
}
@Transactional
@Override
public R handleStatusData(OrderStatusDTO orderStatusDTO) throws CustomerException {
//先保存原始请求数据
PanOrderStatusLog panOrderStatusLog = new PanOrderStatusLog();
panOrderStatusLog.setArgs(JSONObject.toJSONString(orderStatusDTO));
panOrderStatusLog.setStatus(1);
panOrderStatusLog.setType(2);
panOrderStatusLogService.save(panOrderStatusLog);
String status = orderStatusDTO.getStatus();
if("2".equals(status)||"4".equals(status)||"5".equals(status)||"8".equals(status)||"9".equals(status)){
if("2".equals(status)||"5".equals(status)||"8".equals(status)||"9".equals(status)){
logger.info("#############handleStatusData: 当前数据的状态不推送 status={}",status);
return Resp.fail(400,"当前数据的状态不推送");
}
@ -201,7 +193,7 @@ public class PanFactoryDataServiceImpl implements IPanFactoryDataService {
//判断数据是都已存在
if(!orderStatusDTO.verifyData()){
logger.info("#############handleStatusData: 数据不齐全 orderStatusDTO={}",orderStatusDTO);
return Resp.fail(400,"数据不齐全");
return Resp.fail(405,"数据不齐全");
}
//拼接参数
@ -233,13 +225,11 @@ public class PanFactoryDataServiceImpl implements IPanFactoryDataService {
String message = payload.getString("data");
if(code.equals(1)){
logger.info("##########handleStatusData: 物流状态传递成功");
panOrderStatusLog.setStatus(0);
panOrderStatusLogService.saveOrUpdate(panOrderStatusLog);
}else{
return Resp.fail(400,message);
return Resp.fail(405,message);
}
}else{
return Resp.fail(400,"返回格式有误:"+result);
return Resp.fail(405,"返回格式有误:"+result);
}
return Resp.success("SUCCESS");
}

8
blade-service/logpm-factory/src/main/resources/application.yml

@ -51,8 +51,16 @@ spring:
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
direct:
acknowledge-mode: manual
template:
mandatory: true
xxl:
job:

Loading…
Cancel
Save