From 247595a9ac6ed8369d1b98312aac5365c361157c Mon Sep 17 00:00:00 2001 From: smallchill Date: Thu, 2 Jan 2020 23:20:30 +0800 Subject: [PATCH] =?UTF-8?q?:bento:=20=E6=B7=BB=E5=8A=A0kafka=E3=80=81rabbi?= =?UTF-8?q?tmq=E9=9B=86=E6=88=90example?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/LauncherConstant.java | 15 ++ blade-example/blade-mq-kafka/pom.xml | 33 ++++ .../example/mq/kafka/KafkaApplication.java | 35 ++++ .../mq/kafka/config/KafkaConfiguration.java | 60 +++++++ .../mq/kafka/constant/KafkaConstant.java | 18 ++ .../mq/kafka/handler/MessageHandler.java | 31 ++++ .../src/main/resources/application.yml | 25 +++ .../example/mq/kafka/KafkaTest.java | 28 +++ blade-example/blade-mq-rabbit/pom.xml | 38 ++++ .../example/mq/rabbit/RabbitApplication.java | 35 ++++ .../rabbit/config/RabbitMqConfiguration.java | 162 ++++++++++++++++++ .../mq/rabbit/constant/RabbitConstant.java | 58 +++++++ .../mq/rabbit/handler/DelayQueueHandler.java | 42 +++++ .../rabbit/handler/DirectQueueOneHandler.java | 50 ++++++ .../mq/rabbit/handler/QueueThreeHandler.java | 42 +++++ .../mq/rabbit/handler/QueueTwoHandler.java | 42 +++++ .../mq/rabbit/message/MessageStruct.java | 23 +++ .../src/main/resources/application.yml | 15 ++ .../example/mq/rabbit/RabbitTest.java | 84 +++++++++ blade-example/pom.xml | 4 +- blade-service/pom.xml | 5 + 21 files changed, 844 insertions(+), 1 deletion(-) create mode 100644 blade-example/blade-mq-kafka/pom.xml create mode 100644 blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java create mode 100644 blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java create mode 100644 blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java create mode 100644 blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java create mode 100644 blade-example/blade-mq-kafka/src/main/resources/application.yml create mode 100644 blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java create mode 100644 blade-example/blade-mq-rabbit/pom.xml create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java create mode 100644 blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java create mode 100644 blade-example/blade-mq-rabbit/src/main/resources/application.yml create mode 100644 blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java diff --git a/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java b/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java index 0806b1736..8af3af585 100644 --- a/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java +++ b/blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java @@ -87,6 +87,21 @@ public interface LauncherConstant { */ String APPLICATION_EASYPOI_NAME = APPLICATION_NAME_PREFIX + "easypoi"; + /** + * kafka + */ + String APPLICATION_KAFKA_NAME = APPLICATION_NAME_PREFIX + "kafka"; + + /** + * rabbit + */ + String APPLICATION_RABBIT_NAME = APPLICATION_NAME_PREFIX + "rabbit"; + + /** + * rocket + */ + String APPLICATION_ROCKET_NAME = APPLICATION_NAME_PREFIX + "rocket"; + /** * 动态获取nacos地址 * diff --git a/blade-example/blade-mq-kafka/pom.xml b/blade-example/blade-mq-kafka/pom.xml new file mode 100644 index 000000000..6c0e7525c --- /dev/null +++ b/blade-example/blade-mq-kafka/pom.xml @@ -0,0 +1,33 @@ + + + + blade-example + org.springblade + 2.2.2.RELEASE + + 4.0.0 + + blade-mq-kafka + ${project.artifactId} + ${bladex.project.version} + jar + + + + org.springblade + blade-core-launch + + + org.springblade + blade-core-test + test + + + org.springframework.kafka + spring-kafka + + + + diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java new file mode 100644 index 000000000..96e8ad6e7 --- /dev/null +++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2018-2028, Chill Zhuang All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * Neither the name of the dreamlu.net developer nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * Author: Chill 庄骞 (smallchill@163.com) + */ +package org.springblade.example.mq.kafka; + +import org.springblade.common.constant.LauncherConstant; +import org.springblade.core.launch.BladeApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * KafkaApplication + * + * @author Chill + */ +@SpringBootApplication +public class KafkaApplication { + + public static void main(String[] args) { + BladeApplication.run(LauncherConstant.APPLICATION_KAFKA_NAME, KafkaApplication.class, args); + } + +} diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java new file mode 100644 index 000000000..d98416993 --- /dev/null +++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java @@ -0,0 +1,60 @@ +package org.springblade.example.mq.kafka.config; + +import lombok.AllArgsConstructor; +import org.springblade.example.mq.kafka.constant.KafkaConstant; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ContainerProperties; + +/** + * kafka配置类 + * + * @author yangkai.shen + */ +@Configuration +@EnableConfigurationProperties({KafkaProperties.class}) +@EnableKafka +@AllArgsConstructor +public class KafkaConfiguration { + private final KafkaProperties kafkaProperties; + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(KafkaConstant.DEFAULT_PARTITION_NUM); + factory.setBatchListener(true); + factory.getContainerProperties().setPollTimeout(3000); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); + } + + @Bean("ackContainerFactory") + public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.setConcurrency(KafkaConstant.DEFAULT_PARTITION_NUM); + return factory; + } + +} diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java new file mode 100644 index 000000000..d14e184ba --- /dev/null +++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java @@ -0,0 +1,18 @@ +package org.springblade.example.mq.kafka.constant; + +/** + * kafka 常量池 + * + * @author yangkai.shen + */ +public interface KafkaConstant { + /** + * 默认分区大小 + */ + Integer DEFAULT_PARTITION_NUM = 3; + + /** + * Topic 名称 + */ + String TOPIC_TEST = "test"; +} diff --git a/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java new file mode 100644 index 000000000..c3a0b7ef1 --- /dev/null +++ b/blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java @@ -0,0 +1,31 @@ +package org.springblade.example.mq.kafka.handler; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springblade.example.mq.kafka.constant.KafkaConstant; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +/** + * 消息处理器 + * + * @author yangkai.shen + */ +@Component +@Slf4j +public class MessageHandler { + + @KafkaListener(topics = KafkaConstant.TOPIC_TEST, containerFactory = "ackContainerFactory") + public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) { + try { + String message = (String) record.value(); + log.info("收到消息: {}", message); + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + // 手动提交 offset + acknowledgment.acknowledge(); + } + } +} diff --git a/blade-example/blade-mq-kafka/src/main/resources/application.yml b/blade-example/blade-mq-kafka/src/main/resources/application.yml new file mode 100644 index 000000000..ec03d9895 --- /dev/null +++ b/blade-example/blade-mq-kafka/src/main/resources/application.yml @@ -0,0 +1,25 @@ +server: + port: 8601 +spring: + kafka: + bootstrap-servers: localhost:9092 + producer: + retries: 0 + batch-size: 16384 + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + consumer: + group-id: blade-kafka + # 手动提交 + enable-auto-commit: false + auto-offset-reset: latest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session.timeout.ms: 60000 + listener: + log-container-config: false + concurrency: 5 + # 手动提交 + ack-mode: manual_immediate diff --git a/blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java b/blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java new file mode 100644 index 000000000..a5f5da521 --- /dev/null +++ b/blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java @@ -0,0 +1,28 @@ +package org.springblade.example.mq.kafka; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springblade.core.test.BladeBootTest; +import org.springblade.core.test.BladeSpringRunner; +import org.springblade.example.mq.kafka.constant.KafkaConstant; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; + +@RunWith(BladeSpringRunner.class) +@SpringBootTest(classes = KafkaApplication.class) +@BladeBootTest(appName = "blade-kafka", profile = "test", enableLoader = true) +public class KafkaTest { + @Autowired + private KafkaTemplate kafkaTemplate; + + /** + * 测试发送消息 + */ + @Test + public void testSend() { + kafkaTemplate.send(KafkaConstant.TOPIC_TEST, "hello,kafka..."); + } + +} + diff --git a/blade-example/blade-mq-rabbit/pom.xml b/blade-example/blade-mq-rabbit/pom.xml new file mode 100644 index 000000000..1e3ff2139 --- /dev/null +++ b/blade-example/blade-mq-rabbit/pom.xml @@ -0,0 +1,38 @@ + + + + blade-example + org.springblade + 2.2.2.RELEASE + + 4.0.0 + + blade-mq-rabbit + ${project.artifactId} + ${bladex.project.version} + jar + + + + org.springblade + blade-core-launch + + + org.springblade + blade-core-tool + + + org.springblade + blade-core-test + test + + + org.springframework.boot + spring-boot-starter-amqp + + + + + diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java new file mode 100644 index 000000000..dedd974fe --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2018-2028, Chill Zhuang All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * Neither the name of the dreamlu.net developer nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * Author: Chill 庄骞 (smallchill@163.com) + */ +package org.springblade.example.mq.rabbit; + +import org.springblade.common.constant.LauncherConstant; +import org.springblade.core.launch.BladeApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * RabbitApplication + * + * @author Chill + */ +@SpringBootApplication +public class RabbitApplication { + + public static void main(String[] args) { + BladeApplication.run(LauncherConstant.APPLICATION_RABBIT_NAME, RabbitApplication.class, args); + } + +} diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java new file mode 100644 index 000000000..874dc01ce --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java @@ -0,0 +1,162 @@ +package org.springblade.example.mq.rabbit.config; + +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.springblade.example.mq.rabbit.constant.RabbitConstant; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + +/** + * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 + * + * @author yangkai.shen + */ +@Slf4j +@Configuration +public class RabbitMqConfiguration { + + @Bean + public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { + connectionFactory.setPublisherConfirms(true); + connectionFactory.setPublisherReturns(true); + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMandatory(true); + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); + rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message)); + return rabbitTemplate; + } + + /** + * 直接模式队列1 + */ + @Bean + public Queue directOneQueue() { + return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_ONE); + } + + /** + * 队列2 + */ + @Bean + public Queue queueTwo() { + return new Queue(RabbitConstant.QUEUE_TWO); + } + + /** + * 队列3 + */ + @Bean + public Queue queueThree() { + return new Queue(RabbitConstant.QUEUE_THREE); + } + + /** + * 分列模式队列 + */ + @Bean + public FanoutExchange fanoutExchange() { + return new FanoutExchange(RabbitConstant.FANOUT_MODE_QUEUE); + } + + /** + * 分列模式绑定队列1 + * + * @param directOneQueue 绑定队列1 + * @param fanoutExchange 分列模式交换器 + */ + @Bean + public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) { + return BindingBuilder.bind(directOneQueue).to(fanoutExchange); + } + + /** + * 分列模式绑定队列2 + * + * @param queueTwo 绑定队列2 + * @param fanoutExchange 分列模式交换器 + */ + @Bean + public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) { + return BindingBuilder.bind(queueTwo).to(fanoutExchange); + } + + /** + * 主题模式队列 + *
  • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email
  • + *
  • 通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
  • + *
  • 通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
  • + */ + @Bean + public TopicExchange topicExchange() { + return new TopicExchange(RabbitConstant.TOPIC_MODE_QUEUE); + } + + + /** + * 主题模式绑定分列模式 + * + * @param fanoutExchange 分列模式交换器 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) { + return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_ONE); + } + + /** + * 主题模式绑定队列2 + * + * @param queueTwo 队列2 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) { + return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_TWO); + } + + /** + * 主题模式绑定队列3 + * + * @param queueThree 队列3 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) { + return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_THREE); + } + + /** + * 延迟队列 + */ + @Bean + public Queue delayQueue() { + return new Queue(RabbitConstant.DELAY_QUEUE, true); + } + + /** + * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定 + */ + @Bean + public CustomExchange delayExchange() { + Map args = Maps.newHashMap(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitConstant.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args); + } + + /** + * 延迟队列绑定自定义交换器 + * + * @param delayQueue 队列 + * @param delayExchange 延迟交换器 + */ + @Bean + public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { + return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConstant.DELAY_QUEUE).noargs(); + } + +} diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java new file mode 100644 index 000000000..34b560f54 --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java @@ -0,0 +1,58 @@ +package org.springblade.example.mq.rabbit.constant; + +/** + * RabbitMQ常量池 + * + * @author yangkai.shen + */ +public interface RabbitConstant { + /** + * 直接模式1 + */ + String DIRECT_MODE_QUEUE_ONE = "queue.direct.1"; + + /** + * 队列2 + */ + String QUEUE_TWO = "queue.2"; + + /** + * 队列3 + */ + String QUEUE_THREE = "3.queue"; + + /** + * 分列模式 + */ + String FANOUT_MODE_QUEUE = "fanout.mode"; + + /** + * 主题模式 + */ + String TOPIC_MODE_QUEUE = "topic.mode"; + + /** + * 路由1 + */ + String TOPIC_ROUTING_KEY_ONE = "queue.#"; + + /** + * 路由2 + */ + String TOPIC_ROUTING_KEY_TWO = "*.queue"; + + /** + * 路由3 + */ + String TOPIC_ROUTING_KEY_THREE = "3.queue"; + + /** + * 延迟队列 + */ + String DELAY_QUEUE = "delay.queue"; + + /** + * 延迟队列交换器 + */ + String DELAY_MODE_QUEUE = "delay.mode"; +} diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java new file mode 100644 index 000000000..53e5db5d0 --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java @@ -0,0 +1,42 @@ +package org.springblade.example.mq.rabbit.handler; + +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springblade.core.tool.jackson.JsonUtil; +import org.springblade.example.mq.rabbit.constant.RabbitConstant; +import org.springblade.example.mq.rabbit.message.MessageStruct; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 延迟队列处理器 + * + * @author yangkai.shen + */ +@Slf4j +@Component +@RabbitListener(queues = RabbitConstant.DELAY_QUEUE) +public class DelayQueueHandler { + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("延迟队列,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java new file mode 100644 index 000000000..2cdac098f --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java @@ -0,0 +1,50 @@ +package org.springblade.example.mq.rabbit.handler; + +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springblade.core.tool.jackson.JsonUtil; +import org.springblade.example.mq.rabbit.constant.RabbitConstant; +import org.springblade.example.mq.rabbit.message.MessageStruct; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 直接队列1 处理器 + * + * @author yangkai.shen + */ +@Slf4j +@RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_ONE) +@Component +public class DirectQueueOneHandler { + + /** + * 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto,则可以用这个方式,会自动ack + */ + // @RabbitHandler + public void directHandlerAutoAck(MessageStruct message) { + log.info("直接队列处理器,接收消息:{}", JsonUtil.toJson(message)); + } + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("直接队列1,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java new file mode 100644 index 000000000..f9ee789a9 --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java @@ -0,0 +1,42 @@ +package org.springblade.example.mq.rabbit.handler; + +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springblade.core.tool.jackson.JsonUtil; +import org.springblade.example.mq.rabbit.constant.RabbitConstant; +import org.springblade.example.mq.rabbit.message.MessageStruct; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 队列2 处理器 + * + * @author yangkai.shen + */ +@Slf4j +@RabbitListener(queues = RabbitConstant.QUEUE_THREE) +@Component +public class QueueThreeHandler { + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("队列3,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java new file mode 100644 index 000000000..22f61480c --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java @@ -0,0 +1,42 @@ +package org.springblade.example.mq.rabbit.handler; + +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springblade.core.tool.jackson.JsonUtil; +import org.springblade.example.mq.rabbit.constant.RabbitConstant; +import org.springblade.example.mq.rabbit.message.MessageStruct; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 队列2 处理器 + * + * @author yangkai.shen + */ +@Slf4j +@RabbitListener(queues = RabbitConstant.QUEUE_TWO) +@Component +public class QueueTwoHandler { + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("队列2,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java new file mode 100644 index 000000000..104cde12b --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java @@ -0,0 +1,23 @@ +package org.springblade.example.mq.rabbit.message; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 测试消息体 + * + * @author yangkai.shen + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MessageStruct implements Serializable { + private static final long serialVersionUID = 392365881428311040L; + + private String message; +} diff --git a/blade-example/blade-mq-rabbit/src/main/resources/application.yml b/blade-example/blade-mq-rabbit/src/main/resources/application.yml new file mode 100644 index 000000000..89ce1b67e --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/main/resources/application.yml @@ -0,0 +1,15 @@ +server: + port: 8602 +spring: + rabbitmq: + host: localhost + port: 5672 + username: guest + password: guest + virtual-host: / + # 手动提交消息 + listener: + simple: + acknowledge-mode: manual + direct: + acknowledge-mode: manual diff --git a/blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java b/blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java new file mode 100644 index 000000000..e3a4c5d7a --- /dev/null +++ b/blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java @@ -0,0 +1,84 @@ +package org.springblade.example.mq.rabbit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springblade.core.test.BladeBootTest; +import org.springblade.core.test.BladeSpringRunner; +import org.springblade.core.tool.utils.DateUtil; +import org.springblade.example.mq.rabbit.constant.RabbitConstant; +import org.springblade.example.mq.rabbit.message.MessageStruct; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +@RunWith(BladeSpringRunner.class) +@SpringBootTest(classes = RabbitApplication.class) +@BladeBootTest(appName = "blade-rabbit", profile = "test", enableLoader = true) +public class RabbitTest { + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * 测试直接模式发送 + */ + @Test + public void sendDirect() { + rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_QUEUE_ONE, new MessageStruct("direct message")); + } + + /** + * 测试分列模式发送 + */ + @Test + public void sendFanout() { + rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_MODE_QUEUE, "", new MessageStruct("fanout message")); + } + + /** + * 测试主题模式发送1 + */ + @Test + public void sendTopic1() { + rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "queue.aaa.bbb", new MessageStruct("topic message")); + } + + /** + * 测试主题模式发送2 + */ + @Test + public void sendTopic2() { + rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "ccc.queue", new MessageStruct("topic message")); + } + + /** + * 测试主题模式发送3 + */ + @Test + public void sendTopic3() { + rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "3.queue", new MessageStruct("topic message")); + } + + /** + * 测试延迟队列发送 + */ + @Test + public void sendDelay() { + rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE, + new MessageStruct("delay message, delay 5s, " + DateUtil.now()), message -> { + message.getMessageProperties().setHeader("x-delay", 5000); + return message; + }); + rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE, + new MessageStruct("delay message, delay 2s, " + DateUtil.now()), message -> { + message.getMessageProperties().setHeader("x-delay", 2000); + return message; + }); + rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE, + new MessageStruct("delay message, delay 8s, " + DateUtil.now()), message -> { + message.getMessageProperties().setHeader("x-delay", 8000); + return message; + }); + } + +} + diff --git a/blade-example/pom.xml b/blade-example/pom.xml index 2a6bcfcb0..0c9453626 100644 --- a/blade-example/pom.xml +++ b/blade-example/pom.xml @@ -18,9 +18,11 @@ blade-dubbo-consumer blade-dubbo-provider + blade-easypoi + blade-mq-kafka + blade-mq-rabbit blade-seata-order blade-seata-storage - blade-easypoi blade-websocket diff --git a/blade-service/pom.xml b/blade-service/pom.xml index b5e4130ce..7c562fe24 100644 --- a/blade-service/pom.xml +++ b/blade-service/pom.xml @@ -29,6 +29,11 @@ org.springblade blade-starter-tenant + + org.springblade + blade-scope-api + ${bladex.project.version} +