Browse Source

1.测试延迟队列

single_db
zhenghaoyu 1 year ago
parent
commit
bc688374c8
  1. 17
      blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java
  2. 42
      blade-service/logpm-factory/src/main/java/com/logpm/factory/jp/jobhandler/TestJob.java
  3. 34
      blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/TestDelayedQueueHandler.java

17
blade-service/logpm-factory/src/main/java/com/logpm/factory/config/RabbitMqConfiguration.java

@ -289,5 +289,22 @@ public class RabbitMqConfiguration {
}
@Bean
public Queue normalQueue() {
return new Queue("normal_queue", true);
}
@Bean
public CustomExchange normalExchange() {
Map<String, Object> args = Maps.newHashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange("normal_exchange", "x-delayed-message", true, false, args);
}
@Bean
public Binding normalBinding(Queue normalQueue, CustomExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal_routerkey").noargs();
}
}

42
blade-service/logpm-factory/src/main/java/com/logpm/factory/jp/jobhandler/TestJob.java

@ -0,0 +1,42 @@
package com.logpm.factory.jp.jobhandler;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.utils.CommonUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@AllArgsConstructor
@Component
public class TestJob {
private final RabbitTemplate rabbitTemplate;
@XxlJob("testDelayedQueue")
public ReturnT<String> testDelayedQueue(String param) {
log.info("############testDelayedQueue: 测试延迟队列 param={}", param);
Map<String, Object> map = new HashMap<>();
map.put("messageId", CommonUtil.getUUID());
map.put("logId", CommonUtil.getUUID());
map.put("messageData", "body{[aaaaaaaaaaaaaaaaaaaa]}");
map.put("createTime", new Date().getTime());
map.put("flagType", "OrderStatusLog");
log.info("###############TestData11111111111{}", CommonUtil.dateToStringGeneral(new Date()));
//将消息携带绑定键值
rabbitTemplate.convertAndSend("normal_exchange", "normal_routerkey", map, message->{
message.getMessageProperties().setHeader("x-delay",10000);
return message;
});
return ReturnT.SUCCESS;
}
}

34
blade-service/logpm-factory/src/main/java/com/logpm/factory/receiver/TestDelayedQueueHandler.java

@ -0,0 +1,34 @@
package com.logpm.factory.receiver;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.utils.CommonUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Map;
/**
* 异常消息队列 处理器
*
* @author zhy
*/
@Slf4j
@RabbitListener(queues = "normal_queue")
@Component
public class TestDelayedQueueHandler {
@RabbitHandler
public void orderStatusHandlerManualAck(Map map, Message message, Channel channel) {
log.info("###############TestData22222222222222{}", CommonUtil.dateToStringGeneral(new Date()));
System.out.println(11111111);
}
}
Loading…
Cancel
Save