Browse Source

1.mq接收消息,逻辑修改

dev-warehouse
zhenghaoyu 2 years ago
parent
commit
d966b6da4d
  1. 5
      blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java
  2. 19
      blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java
  3. 4
      blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java
  4. 29
      blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java
  5. 2
      blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java
  6. 3
      blade-service/logpm-factory/src/main/resources/application.yml

5
blade-biz-common/src/main/java/org/springblade/common/constant/RabbitConstant.java

@ -23,4 +23,9 @@ public interface RabbitConstant {
String OPEN_ORDER_ROUTING = "open_order_routing";
String DEAL_WITH_QUEUE = "deal_with_queue";
String DEAL_WITH_EXCHANGE = "deal_with_exchange";
String DEAL_WITH_ROUTING = "deal_with_routing";
}

19
blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java

@ -139,4 +139,23 @@ public class RabbitMqConfiguration {
return BindingBuilder.bind(openOrderQueue).to(openOrderExchange).with(RabbitConstant.OPEN_ORDER_ROUTING).noargs();
}
@Bean
public Queue dealWithQueue() {
return new Queue(RabbitConstant.DEAL_WITH_QUEUE, true);
}
@Bean
public CustomExchange dealWithExchange() {
Map<String, Object> args = Maps.newHashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitConstant.DEAL_WITH_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Binding dealWithBinding(Queue dealWithQueue, CustomExchange dealWithExchange) {
return BindingBuilder.bind(dealWithQueue).to(dealWithExchange).with(RabbitConstant.DEAL_WITH_ROUTING).noargs();
}
}

4
blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/ErrorQueueHandler.java

@ -17,9 +17,9 @@ import java.util.Map;
/**
* 直接队列1 处理器
* 异常消息队列 处理器
*
* @author yangkai.shen
* @author zhy
*/
@Slf4j
@RabbitListener(queues = RabbitConstant.ERROR_QUEUE)

29
blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/DirectQueueOneHandler.java → blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/OpenOrderHandler.java

@ -25,43 +25,28 @@ import java.util.Map;
@Slf4j
@RabbitListener(queues = RabbitConstant.OPEN_ORDER_QUEUE)
@Component
public class DirectQueueOneHandler {
public class OpenOrderHandler {
@Autowired
private IPanFactoryDataService panFactoryDataService;
private Integer retryCount = 1;
@RabbitHandler
public void orderStatusHandlerManualAck(Map map, Message message, Channel channel) throws IOException {
public void openOrderStatusHandler(Map map, Message message, Channel channel) throws IOException {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("##################我进入这里了");
int i = 1/0;
String msg = new String(message.getBody());
log.info("##################我进入这里了 retryCount={}",retryCount++);
OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData");
String orderNo = orderInfoDTO.getOrderNo();
R r = panFactoryDataService.handleData(orderInfoDTO);
int code = r.getCode();
if(code == 400 || code == 200){
log.info("##################orderStatusHandlerManualAck: 该条数据不用处理 orderNo={}",orderNo);
log.info("##################openOrderStatusHandler: 该条数据不用处理 orderNo={}",orderNo);
channel.basicAck(deliveryTag,true);
}else{
throw new CustomerException(code,r.getMsg());
}
// try {
// log.info("直接队列1,手动ACK,接收消息:{}", map);
// OrderInfoDTO orderInfoDTO = (OrderInfoDTO) map.get("messageData");
//
//
//
// channel.basicAck(deliveryTag, false);
// } catch (IOException e) {
// try {
// // 处理失败,重新压入MQ
// channel.basicReject(deliveryTag,true);
// } catch (IOException e1) {
// e1.printStackTrace();
// }
// }
}
}

2
blade-service/logpm-factory/src/main/java/com/logpm/factory/snm/controller/PanFactoryDataController.java

@ -206,6 +206,8 @@ public class PanFactoryDataController extends BladeController {
map.put("createTime",new Date().getTime());
//将消息携带绑定键值
rabbitTemplate.convertAndSend(RabbitConstant.ORDER_STATUS_EXCHANGE, RabbitConstant.ORDER_STATUS_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId())));
rabbitTemplate.convertAndSend(RabbitConstant.DEAL_WITH_EXCHANGE, RabbitConstant.DEAL_WITH_ROUTING, map,new CorrelationData(String.valueOf(panOrderStatusLog.getId())));
// R r = factoryDataService.handleStatusData(orderStatusDTO);
return R.success("调用成功");

3
blade-service/logpm-factory/src/main/resources/application.yml

@ -50,7 +50,8 @@ spring:
# 手动提交消息
listener:
simple:
acknowledge-mode: manual
acknowledge-mode: auto
default-requeue-rejected: false
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒

Loading…
Cancel
Save