diff --git a/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java b/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java
index 0806b1736..8af3af585 100644
--- a/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java
+++ b/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java
@@ -87,6 +87,21 @@ public interface LauncherConstant {
*/
String APPLICATION_EASYPOI_NAME = APPLICATION_NAME_PREFIX + "easypoi";
+ /**
+ * kafka
+ */
+ String APPLICATION_KAFKA_NAME = APPLICATION_NAME_PREFIX + "kafka";
+
+ /**
+ * rabbit
+ */
+ String APPLICATION_RABBIT_NAME = APPLICATION_NAME_PREFIX + "rabbit";
+
+ /**
+ * rocket
+ */
+ String APPLICATION_ROCKET_NAME = APPLICATION_NAME_PREFIX + "rocket";
+
/**
* 动态获取nacos地址
*
diff --git a/blade-example/blade-mq-kafka/pom.xml b/blade-example/blade-mq-kafka/pom.xml
new file mode 100644
index 000000000..6c0e7525c
--- /dev/null
+++ b/blade-example/blade-mq-kafka/pom.xml
@@ -0,0 +1,33 @@
+
+
+
+ blade-example
+ org.springblade
+ 2.2.2.RELEASE
+
+ 4.0.0
+
+ blade-mq-kafka
+ ${project.artifactId}
+ ${bladex.project.version}
+ jar
+
+
+
+ org.springblade
+ blade-core-launch
+
+
+ org.springblade
+ blade-core-test
+ test
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+
diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java
new file mode 100644
index 000000000..96e8ad6e7
--- /dev/null
+++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2018-2028, Chill Zhuang All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * Neither the name of the dreamlu.net developer nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ * Author: Chill 庄骞 (smallchill@163.com)
+ */
+package org.springblade.example.mq.kafka;
+
+import org.springblade.common.constant.LauncherConstant;
+import org.springblade.core.launch.BladeApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * KafkaApplication
+ *
+ * @author Chill
+ */
+@SpringBootApplication
+public class KafkaApplication {
+
+ public static void main(String[] args) {
+ BladeApplication.run(LauncherConstant.APPLICATION_KAFKA_NAME, KafkaApplication.class, args);
+ }
+
+}
diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java
new file mode 100644
index 000000000..d98416993
--- /dev/null
+++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java
@@ -0,0 +1,60 @@
+package org.springblade.example.mq.kafka.config;
+
+import lombok.AllArgsConstructor;
+import org.springblade.example.mq.kafka.constant.KafkaConstant;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.listener.ContainerProperties;
+
+/**
+ * kafka配置类
+ *
+ * @author yangkai.shen
+ */
+@Configuration
+@EnableConfigurationProperties({KafkaProperties.class})
+@EnableKafka
+@AllArgsConstructor
+public class KafkaConfiguration {
+ private final KafkaProperties kafkaProperties;
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ factory.setConcurrency(KafkaConstant.DEFAULT_PARTITION_NUM);
+ factory.setBatchListener(true);
+ factory.getContainerProperties().setPollTimeout(3000);
+ return factory;
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
+ }
+
+ @Bean("ackContainerFactory")
+ public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+ factory.setConcurrency(KafkaConstant.DEFAULT_PARTITION_NUM);
+ return factory;
+ }
+
+}
diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java
new file mode 100644
index 000000000..d14e184ba
--- /dev/null
+++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java
@@ -0,0 +1,18 @@
+package org.springblade.example.mq.kafka.constant;
+
+/**
+ * kafka 常量池
+ *
+ * @author yangkai.shen
+ */
+public interface KafkaConstant {
+ /**
+ * 默认分区大小
+ */
+ Integer DEFAULT_PARTITION_NUM = 3;
+
+ /**
+ * Topic 名称
+ */
+ String TOPIC_TEST = "test";
+}
diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java
new file mode 100644
index 000000000..c3a0b7ef1
--- /dev/null
+++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java
@@ -0,0 +1,31 @@
+package org.springblade.example.mq.kafka.handler;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springblade.example.mq.kafka.constant.KafkaConstant;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+/**
+ * 消息处理器
+ *
+ * @author yangkai.shen
+ */
+@Component
+@Slf4j
+public class MessageHandler {
+
+ @KafkaListener(topics = KafkaConstant.TOPIC_TEST, containerFactory = "ackContainerFactory")
+ public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
+ try {
+ String message = (String) record.value();
+ log.info("收到消息: {}", message);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ // 手动提交 offset
+ acknowledgment.acknowledge();
+ }
+ }
+}
diff --git a/blade-example/blade-mq-kafka/src/main/resources/application.yml b/blade-example/blade-mq-kafka/src/main/resources/application.yml
new file mode 100644
index 000000000..ec03d9895
--- /dev/null
+++ b/blade-example/blade-mq-kafka/src/main/resources/application.yml
@@ -0,0 +1,25 @@
+server:
+ port: 8601
+spring:
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ retries: 0
+ batch-size: 16384
+ buffer-memory: 33554432
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ consumer:
+ group-id: blade-kafka
+ # 手动提交
+ enable-auto-commit: false
+ auto-offset-reset: latest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ properties:
+ session.timeout.ms: 60000
+ listener:
+ log-container-config: false
+ concurrency: 5
+ # 手动提交
+ ack-mode: manual_immediate
diff --git a/blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java b/blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java
new file mode 100644
index 000000000..a5f5da521
--- /dev/null
+++ b/blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java
@@ -0,0 +1,28 @@
+package org.springblade.example.mq.kafka;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springblade.core.test.BladeBootTest;
+import org.springblade.core.test.BladeSpringRunner;
+import org.springblade.example.mq.kafka.constant.KafkaConstant;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
+
+@RunWith(BladeSpringRunner.class)
+@SpringBootTest(classes = KafkaApplication.class)
+@BladeBootTest(appName = "blade-kafka", profile = "test", enableLoader = true)
+public class KafkaTest {
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ /**
+ * 测试发送消息
+ */
+ @Test
+ public void testSend() {
+ kafkaTemplate.send(KafkaConstant.TOPIC_TEST, "hello,kafka...");
+ }
+
+}
+
diff --git a/blade-example/blade-mq-rabbit/pom.xml b/blade-example/blade-mq-rabbit/pom.xml
new file mode 100644
index 000000000..1e3ff2139
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/pom.xml
@@ -0,0 +1,38 @@
+
+
+
+ blade-example
+ org.springblade
+ 2.2.2.RELEASE
+
+ 4.0.0
+
+ blade-mq-rabbit
+ ${project.artifactId}
+ ${bladex.project.version}
+ jar
+
+
+
+ org.springblade
+ blade-core-launch
+
+
+ org.springblade
+ blade-core-tool
+
+
+ org.springblade
+ blade-core-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
+
+
+
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java
new file mode 100644
index 000000000..dedd974fe
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2018-2028, Chill Zhuang All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * Neither the name of the dreamlu.net developer nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ * Author: Chill 庄骞 (smallchill@163.com)
+ */
+package org.springblade.example.mq.rabbit;
+
+import org.springblade.common.constant.LauncherConstant;
+import org.springblade.core.launch.BladeApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * RabbitApplication
+ *
+ * @author Chill
+ */
+@SpringBootApplication
+public class RabbitApplication {
+
+ public static void main(String[] args) {
+ BladeApplication.run(LauncherConstant.APPLICATION_RABBIT_NAME, RabbitApplication.class, args);
+ }
+
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java
new file mode 100644
index 000000000..874dc01ce
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java
@@ -0,0 +1,162 @@
+package org.springblade.example.mq.rabbit.config;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.example.mq.rabbit.constant.RabbitConstant;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+
+/**
+ * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类
+ *
+ * @author yangkai.shen
+ */
+@Slf4j
+@Configuration
+public class RabbitMqConfiguration {
+
+ @Bean
+ public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
+ connectionFactory.setPublisherConfirms(true);
+ connectionFactory.setPublisherReturns(true);
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+ rabbitTemplate.setMandatory(true);
+ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
+ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
+ return rabbitTemplate;
+ }
+
+ /**
+ * 直接模式队列1
+ */
+ @Bean
+ public Queue directOneQueue() {
+ return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_ONE);
+ }
+
+ /**
+ * 队列2
+ */
+ @Bean
+ public Queue queueTwo() {
+ return new Queue(RabbitConstant.QUEUE_TWO);
+ }
+
+ /**
+ * 队列3
+ */
+ @Bean
+ public Queue queueThree() {
+ return new Queue(RabbitConstant.QUEUE_THREE);
+ }
+
+ /**
+ * 分列模式队列
+ */
+ @Bean
+ public FanoutExchange fanoutExchange() {
+ return new FanoutExchange(RabbitConstant.FANOUT_MODE_QUEUE);
+ }
+
+ /**
+ * 分列模式绑定队列1
+ *
+ * @param directOneQueue 绑定队列1
+ * @param fanoutExchange 分列模式交换器
+ */
+ @Bean
+ public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) {
+ return BindingBuilder.bind(directOneQueue).to(fanoutExchange);
+ }
+
+ /**
+ * 分列模式绑定队列2
+ *
+ * @param queueTwo 绑定队列2
+ * @param fanoutExchange 分列模式交换器
+ */
+ @Bean
+ public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) {
+ return BindingBuilder.bind(queueTwo).to(fanoutExchange);
+ }
+
+ /**
+ * 主题模式队列
+ * 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email
+ * 通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
+ * 通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
+ */
+ @Bean
+ public TopicExchange topicExchange() {
+ return new TopicExchange(RabbitConstant.TOPIC_MODE_QUEUE);
+ }
+
+
+ /**
+ * 主题模式绑定分列模式
+ *
+ * @param fanoutExchange 分列模式交换器
+ * @param topicExchange 主题模式交换器
+ */
+ @Bean
+ public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) {
+ return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_ONE);
+ }
+
+ /**
+ * 主题模式绑定队列2
+ *
+ * @param queueTwo 队列2
+ * @param topicExchange 主题模式交换器
+ */
+ @Bean
+ public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) {
+ return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_TWO);
+ }
+
+ /**
+ * 主题模式绑定队列3
+ *
+ * @param queueThree 队列3
+ * @param topicExchange 主题模式交换器
+ */
+ @Bean
+ public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) {
+ return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_THREE);
+ }
+
+ /**
+ * 延迟队列
+ */
+ @Bean
+ public Queue delayQueue() {
+ return new Queue(RabbitConstant.DELAY_QUEUE, true);
+ }
+
+ /**
+ * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定
+ */
+ @Bean
+ public CustomExchange delayExchange() {
+ Map args = Maps.newHashMap();
+ args.put("x-delayed-type", "direct");
+ return new CustomExchange(RabbitConstant.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args);
+ }
+
+ /**
+ * 延迟队列绑定自定义交换器
+ *
+ * @param delayQueue 队列
+ * @param delayExchange 延迟交换器
+ */
+ @Bean
+ public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
+ return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConstant.DELAY_QUEUE).noargs();
+ }
+
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java
new file mode 100644
index 000000000..34b560f54
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java
@@ -0,0 +1,58 @@
+package org.springblade.example.mq.rabbit.constant;
+
+/**
+ * RabbitMQ常量池
+ *
+ * @author yangkai.shen
+ */
+public interface RabbitConstant {
+ /**
+ * 直接模式1
+ */
+ String DIRECT_MODE_QUEUE_ONE = "queue.direct.1";
+
+ /**
+ * 队列2
+ */
+ String QUEUE_TWO = "queue.2";
+
+ /**
+ * 队列3
+ */
+ String QUEUE_THREE = "3.queue";
+
+ /**
+ * 分列模式
+ */
+ String FANOUT_MODE_QUEUE = "fanout.mode";
+
+ /**
+ * 主题模式
+ */
+ String TOPIC_MODE_QUEUE = "topic.mode";
+
+ /**
+ * 路由1
+ */
+ String TOPIC_ROUTING_KEY_ONE = "queue.#";
+
+ /**
+ * 路由2
+ */
+ String TOPIC_ROUTING_KEY_TWO = "*.queue";
+
+ /**
+ * 路由3
+ */
+ String TOPIC_ROUTING_KEY_THREE = "3.queue";
+
+ /**
+ * 延迟队列
+ */
+ String DELAY_QUEUE = "delay.queue";
+
+ /**
+ * 延迟队列交换器
+ */
+ String DELAY_MODE_QUEUE = "delay.mode";
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java
new file mode 100644
index 000000000..53e5db5d0
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java
@@ -0,0 +1,42 @@
+package org.springblade.example.mq.rabbit.handler;
+
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.example.mq.rabbit.constant.RabbitConstant;
+import org.springblade.example.mq.rabbit.message.MessageStruct;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * 延迟队列处理器
+ *
+ * @author yangkai.shen
+ */
+@Slf4j
+@Component
+@RabbitListener(queues = RabbitConstant.DELAY_QUEUE)
+public class DelayQueueHandler {
+
+ @RabbitHandler
+ public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
+ // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
+ final long deliveryTag = message.getMessageProperties().getDeliveryTag();
+ try {
+ log.info("延迟队列,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
+ // 通知 MQ 消息已被成功消费,可以ACK了
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException e) {
+ try {
+ // 处理失败,重新压入MQ
+ channel.basicRecover();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java
new file mode 100644
index 000000000..2cdac098f
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java
@@ -0,0 +1,50 @@
+package org.springblade.example.mq.rabbit.handler;
+
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.example.mq.rabbit.constant.RabbitConstant;
+import org.springblade.example.mq.rabbit.message.MessageStruct;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * 直接队列1 处理器
+ *
+ * @author yangkai.shen
+ */
+@Slf4j
+@RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_ONE)
+@Component
+public class DirectQueueOneHandler {
+
+ /**
+ * 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto,则可以用这个方式,会自动ack
+ */
+ // @RabbitHandler
+ public void directHandlerAutoAck(MessageStruct message) {
+ log.info("直接队列处理器,接收消息:{}", JsonUtil.toJson(message));
+ }
+
+ @RabbitHandler
+ public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
+ // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
+ final long deliveryTag = message.getMessageProperties().getDeliveryTag();
+ try {
+ log.info("直接队列1,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
+ // 通知 MQ 消息已被成功消费,可以ACK了
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException e) {
+ try {
+ // 处理失败,重新压入MQ
+ channel.basicRecover();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java
new file mode 100644
index 000000000..f9ee789a9
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java
@@ -0,0 +1,42 @@
+package org.springblade.example.mq.rabbit.handler;
+
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.example.mq.rabbit.constant.RabbitConstant;
+import org.springblade.example.mq.rabbit.message.MessageStruct;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * 队列2 处理器
+ *
+ * @author yangkai.shen
+ */
+@Slf4j
+@RabbitListener(queues = RabbitConstant.QUEUE_THREE)
+@Component
+public class QueueThreeHandler {
+
+ @RabbitHandler
+ public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
+ // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
+ final long deliveryTag = message.getMessageProperties().getDeliveryTag();
+ try {
+ log.info("队列3,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
+ // 通知 MQ 消息已被成功消费,可以ACK了
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException e) {
+ try {
+ // 处理失败,重新压入MQ
+ channel.basicRecover();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java
new file mode 100644
index 000000000..22f61480c
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java
@@ -0,0 +1,42 @@
+package org.springblade.example.mq.rabbit.handler;
+
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.example.mq.rabbit.constant.RabbitConstant;
+import org.springblade.example.mq.rabbit.message.MessageStruct;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * 队列2 处理器
+ *
+ * @author yangkai.shen
+ */
+@Slf4j
+@RabbitListener(queues = RabbitConstant.QUEUE_TWO)
+@Component
+public class QueueTwoHandler {
+
+ @RabbitHandler
+ public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
+ // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
+ final long deliveryTag = message.getMessageProperties().getDeliveryTag();
+ try {
+ log.info("队列2,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
+ // 通知 MQ 消息已被成功消费,可以ACK了
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException e) {
+ try {
+ // 处理失败,重新压入MQ
+ channel.basicRecover();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java
new file mode 100644
index 000000000..104cde12b
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java
@@ -0,0 +1,23 @@
+package org.springblade.example.mq.rabbit.message;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * 测试消息体
+ *
+ * @author yangkai.shen
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MessageStruct implements Serializable {
+ private static final long serialVersionUID = 392365881428311040L;
+
+ private String message;
+}
diff --git a/blade-example/blade-mq-rabbit/src/main/resources/application.yml b/blade-example/blade-mq-rabbit/src/main/resources/application.yml
new file mode 100644
index 000000000..89ce1b67e
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/main/resources/application.yml
@@ -0,0 +1,15 @@
+server:
+ port: 8602
+spring:
+ rabbitmq:
+ host: localhost
+ port: 5672
+ username: guest
+ password: guest
+ virtual-host: /
+ # 手动提交消息
+ listener:
+ simple:
+ acknowledge-mode: manual
+ direct:
+ acknowledge-mode: manual
diff --git a/blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java b/blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java
new file mode 100644
index 000000000..e3a4c5d7a
--- /dev/null
+++ b/blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java
@@ -0,0 +1,84 @@
+package org.springblade.example.mq.rabbit;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springblade.core.test.BladeBootTest;
+import org.springblade.core.test.BladeSpringRunner;
+import org.springblade.core.tool.utils.DateUtil;
+import org.springblade.example.mq.rabbit.constant.RabbitConstant;
+import org.springblade.example.mq.rabbit.message.MessageStruct;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@RunWith(BladeSpringRunner.class)
+@SpringBootTest(classes = RabbitApplication.class)
+@BladeBootTest(appName = "blade-rabbit", profile = "test", enableLoader = true)
+public class RabbitTest {
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+ /**
+ * 测试直接模式发送
+ */
+ @Test
+ public void sendDirect() {
+ rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_QUEUE_ONE, new MessageStruct("direct message"));
+ }
+
+ /**
+ * 测试分列模式发送
+ */
+ @Test
+ public void sendFanout() {
+ rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_MODE_QUEUE, "", new MessageStruct("fanout message"));
+ }
+
+ /**
+ * 测试主题模式发送1
+ */
+ @Test
+ public void sendTopic1() {
+ rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "queue.aaa.bbb", new MessageStruct("topic message"));
+ }
+
+ /**
+ * 测试主题模式发送2
+ */
+ @Test
+ public void sendTopic2() {
+ rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "ccc.queue", new MessageStruct("topic message"));
+ }
+
+ /**
+ * 测试主题模式发送3
+ */
+ @Test
+ public void sendTopic3() {
+ rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "3.queue", new MessageStruct("topic message"));
+ }
+
+ /**
+ * 测试延迟队列发送
+ */
+ @Test
+ public void sendDelay() {
+ rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE,
+ new MessageStruct("delay message, delay 5s, " + DateUtil.now()), message -> {
+ message.getMessageProperties().setHeader("x-delay", 5000);
+ return message;
+ });
+ rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE,
+ new MessageStruct("delay message, delay 2s, " + DateUtil.now()), message -> {
+ message.getMessageProperties().setHeader("x-delay", 2000);
+ return message;
+ });
+ rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE,
+ new MessageStruct("delay message, delay 8s, " + DateUtil.now()), message -> {
+ message.getMessageProperties().setHeader("x-delay", 8000);
+ return message;
+ });
+ }
+
+}
+
diff --git a/blade-example/pom.xml b/blade-example/pom.xml
index 2a6bcfcb0..0c9453626 100644
--- a/blade-example/pom.xml
+++ b/blade-example/pom.xml
@@ -18,9 +18,11 @@
blade-dubbo-consumer
blade-dubbo-provider
+ blade-easypoi
+ blade-mq-kafka
+ blade-mq-rabbit
blade-seata-order
blade-seata-storage
- blade-easypoi
blade-websocket
diff --git a/blade-service/pom.xml b/blade-service/pom.xml
index b5e4130ce..7c562fe24 100644
--- a/blade-service/pom.xml
+++ b/blade-service/pom.xml
@@ -29,6 +29,11 @@
org.springblade
blade-starter-tenant
+
+ org.springblade
+ blade-scope-api
+ ${bladex.project.version}
+