Browse Source

🍱 添加kafka、rabbitmq集成example

test
smallchill 5 years ago
parent
commit
247595a9ac
  1. 15
      blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java
  2. 33
      blade-example/blade-mq-kafka/pom.xml
  3. 35
      blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java
  4. 60
      blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java
  5. 18
      blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java
  6. 31
      blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java
  7. 25
      blade-example/blade-mq-kafka/src/main/resources/application.yml
  8. 28
      blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java
  9. 38
      blade-example/blade-mq-rabbit/pom.xml
  10. 35
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java
  11. 162
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java
  12. 58
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java
  13. 42
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java
  14. 50
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java
  15. 42
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java
  16. 42
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java
  17. 23
      blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java
  18. 15
      blade-example/blade-mq-rabbit/src/main/resources/application.yml
  19. 84
      blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java
  20. 4
      blade-example/pom.xml
  21. 5
      blade-service/pom.xml

15
blade-common/src/main/java/org/springblade/common/constant/LauncherConstant.java

@ -87,6 +87,21 @@ public interface LauncherConstant {
*/
String APPLICATION_EASYPOI_NAME = APPLICATION_NAME_PREFIX + "easypoi";
/**
* kafka
*/
String APPLICATION_KAFKA_NAME = APPLICATION_NAME_PREFIX + "kafka";
/**
* rabbit
*/
String APPLICATION_RABBIT_NAME = APPLICATION_NAME_PREFIX + "rabbit";
/**
* rocket
*/
String APPLICATION_ROCKET_NAME = APPLICATION_NAME_PREFIX + "rocket";
/**
* 动态获取nacos地址
*

33
blade-example/blade-mq-kafka/pom.xml

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>blade-example</artifactId>
<groupId>org.springblade</groupId>
<version>2.2.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>blade-mq-kafka</artifactId>
<name>${project.artifactId}</name>
<version>${bladex.project.version}</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springblade</groupId>
<artifactId>blade-core-launch</artifactId>
</dependency>
<dependency>
<groupId>org.springblade</groupId>
<artifactId>blade-core-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>

35
blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/KafkaApplication.java

@ -0,0 +1,35 @@
/*
* Copyright (c) 2018-2028, Chill Zhuang All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* Neither the name of the dreamlu.net developer nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
* Author: Chill 庄骞 (smallchill@163.com)
*/
package org.springblade.example.mq.kafka;
import org.springblade.common.constant.LauncherConstant;
import org.springblade.core.launch.BladeApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* KafkaApplication
*
* @author Chill
*/
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
BladeApplication.run(LauncherConstant.APPLICATION_KAFKA_NAME, KafkaApplication.class, args);
}
}

60
blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/config/KafkaConfiguration.java

@ -0,0 +1,60 @@
package org.springblade.example.mq.kafka.config;
import lombok.AllArgsConstructor;
import org.springblade.example.mq.kafka.constant.KafkaConstant;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
/**
* kafka配置类
*
* @author yangkai.shen
*/
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConstant.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(KafkaConstant.DEFAULT_PARTITION_NUM);
return factory;
}
}

18
blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/constant/KafkaConstant.java

@ -0,0 +1,18 @@
package org.springblade.example.mq.kafka.constant;
/**
* kafka 常量池
*
* @author yangkai.shen
*/
public interface KafkaConstant {
/**
* 默认分区大小
*/
Integer DEFAULT_PARTITION_NUM = 3;
/**
* Topic 名称
*/
String TOPIC_TEST = "test";
}

31
blade-example/blade-mq-kafka/src/main/java/org/springblade/example/mq/kafka/handler/MessageHandler.java

@ -0,0 +1,31 @@
package org.springblade.example.mq.kafka.handler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springblade.example.mq.kafka.constant.KafkaConstant;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* 消息处理器
*
* @author yangkai.shen
*/
@Component
@Slf4j
public class MessageHandler {
@KafkaListener(topics = KafkaConstant.TOPIC_TEST, containerFactory = "ackContainerFactory")
public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("收到消息: {}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 手动提交 offset
acknowledgment.acknowledge();
}
}
}

25
blade-example/blade-mq-kafka/src/main/resources/application.yml

@ -0,0 +1,25 @@
server:
port: 8601
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: blade-kafka
# 手动提交
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
listener:
log-container-config: false
concurrency: 5
# 手动提交
ack-mode: manual_immediate

28
blade-example/blade-mq-kafka/src/test/java/org/springblade/example/mq/kafka/KafkaTest.java

@ -0,0 +1,28 @@
package org.springblade.example.mq.kafka;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springblade.core.test.BladeBootTest;
import org.springblade.core.test.BladeSpringRunner;
import org.springblade.example.mq.kafka.constant.KafkaConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@RunWith(BladeSpringRunner.class)
@SpringBootTest(classes = KafkaApplication.class)
@BladeBootTest(appName = "blade-kafka", profile = "test", enableLoader = true)
public class KafkaTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 测试发送消息
*/
@Test
public void testSend() {
kafkaTemplate.send(KafkaConstant.TOPIC_TEST, "hello,kafka...");
}
}

38
blade-example/blade-mq-rabbit/pom.xml

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>blade-example</artifactId>
<groupId>org.springblade</groupId>
<version>2.2.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>blade-mq-rabbit</artifactId>
<name>${project.artifactId}</name>
<version>${bladex.project.version}</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springblade</groupId>
<artifactId>blade-core-launch</artifactId>
</dependency>
<dependency>
<groupId>org.springblade</groupId>
<artifactId>blade-core-tool</artifactId>
</dependency>
<dependency>
<groupId>org.springblade</groupId>
<artifactId>blade-core-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>

35
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/RabbitApplication.java

@ -0,0 +1,35 @@
/*
* Copyright (c) 2018-2028, Chill Zhuang All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* Neither the name of the dreamlu.net developer nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
* Author: Chill 庄骞 (smallchill@163.com)
*/
package org.springblade.example.mq.rabbit;
import org.springblade.common.constant.LauncherConstant;
import org.springblade.core.launch.BladeApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* RabbitApplication
*
* @author Chill
*/
@SpringBootApplication
public class RabbitApplication {
public static void main(String[] args) {
BladeApplication.run(LauncherConstant.APPLICATION_RABBIT_NAME, RabbitApplication.class, args);
}
}

162
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/config/RabbitMqConfiguration.java

@ -0,0 +1,162 @@
package org.springblade.example.mq.rabbit.config;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springblade.example.mq.rabbit.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* RabbitMQ配置主要是配置队列如果提前存在该队列可以省略本配置类
*
* @author yangkai.shen
*/
@Slf4j
@Configuration
public class RabbitMqConfiguration {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
/**
* 直接模式队列1
*/
@Bean
public Queue directOneQueue() {
return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_ONE);
}
/**
* 队列2
*/
@Bean
public Queue queueTwo() {
return new Queue(RabbitConstant.QUEUE_TWO);
}
/**
* 队列3
*/
@Bean
public Queue queueThree() {
return new Queue(RabbitConstant.QUEUE_THREE);
}
/**
* 分列模式队列
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConstant.FANOUT_MODE_QUEUE);
}
/**
* 分列模式绑定队列1
*
* @param directOneQueue 绑定队列1
* @param fanoutExchange 分列模式交换器
*/
@Bean
public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(directOneQueue).to(fanoutExchange);
}
/**
* 分列模式绑定队列2
*
* @param queueTwo 绑定队列2
* @param fanoutExchange 分列模式交换器
*/
@Bean
public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueTwo).to(fanoutExchange);
}
/**
* 主题模式队列
* <li>路由格式必须以 . 分隔比如 user.email 或者 user.aaa.email</li>
* <li>通配符 * 代表一个占位符或者说一个单词比如路由为 user.*那么 user.email 可以匹配但是 user.aaa.email 就匹配不了</li>
* <li>通配符 # 代表一个或多个占位符或者说一个或多个单词比如路由为 user.#那么 user.email 可以匹配user.aaa.email 也可以匹配</li>
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(RabbitConstant.TOPIC_MODE_QUEUE);
}
/**
* 主题模式绑定分列模式
*
* @param fanoutExchange 分列模式交换器
* @param topicExchange 主题模式交换器
*/
@Bean
public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) {
return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_ONE);
}
/**
* 主题模式绑定队列2
*
* @param queueTwo 队列2
* @param topicExchange 主题模式交换器
*/
@Bean
public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_TWO);
}
/**
* 主题模式绑定队列3
*
* @param queueThree 队列3
* @param topicExchange 主题模式交换器
*/
@Bean
public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) {
return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_THREE);
}
/**
* 延迟队列
*/
@Bean
public Queue delayQueue() {
return new Queue(RabbitConstant.DELAY_QUEUE, true);
}
/**
* 延迟队列交换器, x-delayed-type x-delayed-message 固定
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = Maps.newHashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitConstant.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args);
}
/**
* 延迟队列绑定自定义交换器
*
* @param delayQueue 队列
* @param delayExchange 延迟交换器
*/
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConstant.DELAY_QUEUE).noargs();
}
}

58
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/constant/RabbitConstant.java

@ -0,0 +1,58 @@
package org.springblade.example.mq.rabbit.constant;
/**
* RabbitMQ常量池
*
* @author yangkai.shen
*/
public interface RabbitConstant {
/**
* 直接模式1
*/
String DIRECT_MODE_QUEUE_ONE = "queue.direct.1";
/**
* 队列2
*/
String QUEUE_TWO = "queue.2";
/**
* 队列3
*/
String QUEUE_THREE = "3.queue";
/**
* 分列模式
*/
String FANOUT_MODE_QUEUE = "fanout.mode";
/**
* 主题模式
*/
String TOPIC_MODE_QUEUE = "topic.mode";
/**
* 路由1
*/
String TOPIC_ROUTING_KEY_ONE = "queue.#";
/**
* 路由2
*/
String TOPIC_ROUTING_KEY_TWO = "*.queue";
/**
* 路由3
*/
String TOPIC_ROUTING_KEY_THREE = "3.queue";
/**
* 延迟队列
*/
String DELAY_QUEUE = "delay.queue";
/**
* 延迟队列交换器
*/
String DELAY_MODE_QUEUE = "delay.mode";
}

42
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DelayQueueHandler.java

@ -0,0 +1,42 @@
package org.springblade.example.mq.rabbit.handler;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springblade.example.mq.rabbit.constant.RabbitConstant;
import org.springblade.example.mq.rabbit.message.MessageStruct;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 延迟队列处理器
*
* @author yangkai.shen
*/
@Slf4j
@Component
@RabbitListener(queues = RabbitConstant.DELAY_QUEUE)
public class DelayQueueHandler {
@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("延迟队列,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

50
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/DirectQueueOneHandler.java

@ -0,0 +1,50 @@
package org.springblade.example.mq.rabbit.handler;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springblade.example.mq.rabbit.constant.RabbitConstant;
import org.springblade.example.mq.rabbit.message.MessageStruct;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 直接队列1 处理器
*
* @author yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_ONE)
@Component
public class DirectQueueOneHandler {
/**
* 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto则可以用这个方式会自动ack
*/
// @RabbitHandler
public void directHandlerAutoAck(MessageStruct message) {
log.info("直接队列处理器,接收消息:{}", JsonUtil.toJson(message));
}
@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("直接队列1,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

42
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueThreeHandler.java

@ -0,0 +1,42 @@
package org.springblade.example.mq.rabbit.handler;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springblade.example.mq.rabbit.constant.RabbitConstant;
import org.springblade.example.mq.rabbit.message.MessageStruct;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 队列2 处理器
*
* @author yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConstant.QUEUE_THREE)
@Component
public class QueueThreeHandler {
@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("队列3,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

42
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/handler/QueueTwoHandler.java

@ -0,0 +1,42 @@
package org.springblade.example.mq.rabbit.handler;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springblade.example.mq.rabbit.constant.RabbitConstant;
import org.springblade.example.mq.rabbit.message.MessageStruct;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 队列2 处理器
*
* @author yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConstant.QUEUE_TWO)
@Component
public class QueueTwoHandler {
@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("队列2,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

23
blade-example/blade-mq-rabbit/src/main/java/org/springblade/example/mq/rabbit/message/MessageStruct.java

@ -0,0 +1,23 @@
package org.springblade.example.mq.rabbit.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 测试消息体
*
* @author yangkai.shen
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageStruct implements Serializable {
private static final long serialVersionUID = 392365881428311040L;
private String message;
}

15
blade-example/blade-mq-rabbit/src/main/resources/application.yml

@ -0,0 +1,15 @@
server:
port: 8602
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 手动提交消息
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual

84
blade-example/blade-mq-rabbit/src/test/java/org/springblade/example/mq/rabbit/RabbitTest.java

@ -0,0 +1,84 @@
package org.springblade.example.mq.rabbit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springblade.core.test.BladeBootTest;
import org.springblade.core.test.BladeSpringRunner;
import org.springblade.core.tool.utils.DateUtil;
import org.springblade.example.mq.rabbit.constant.RabbitConstant;
import org.springblade.example.mq.rabbit.message.MessageStruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@RunWith(BladeSpringRunner.class)
@SpringBootTest(classes = RabbitApplication.class)
@BladeBootTest(appName = "blade-rabbit", profile = "test", enableLoader = true)
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试直接模式发送
*/
@Test
public void sendDirect() {
rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_QUEUE_ONE, new MessageStruct("direct message"));
}
/**
* 测试分列模式发送
*/
@Test
public void sendFanout() {
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_MODE_QUEUE, "", new MessageStruct("fanout message"));
}
/**
* 测试主题模式发送1
*/
@Test
public void sendTopic1() {
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "queue.aaa.bbb", new MessageStruct("topic message"));
}
/**
* 测试主题模式发送2
*/
@Test
public void sendTopic2() {
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "ccc.queue", new MessageStruct("topic message"));
}
/**
* 测试主题模式发送3
*/
@Test
public void sendTopic3() {
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_QUEUE, "3.queue", new MessageStruct("topic message"));
}
/**
* 测试延迟队列发送
*/
@Test
public void sendDelay() {
rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE,
new MessageStruct("delay message, delay 5s, " + DateUtil.now()), message -> {
message.getMessageProperties().setHeader("x-delay", 5000);
return message;
});
rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE,
new MessageStruct("delay message, delay 2s, " + DateUtil.now()), message -> {
message.getMessageProperties().setHeader("x-delay", 2000);
return message;
});
rabbitTemplate.convertAndSend(RabbitConstant.DELAY_MODE_QUEUE, RabbitConstant.DELAY_QUEUE,
new MessageStruct("delay message, delay 8s, " + DateUtil.now()), message -> {
message.getMessageProperties().setHeader("x-delay", 8000);
return message;
});
}
}

4
blade-example/pom.xml

@ -18,9 +18,11 @@
<modules>
<module>blade-dubbo-consumer</module>
<module>blade-dubbo-provider</module>
<module>blade-easypoi</module>
<module>blade-mq-kafka</module>
<module>blade-mq-rabbit</module>
<module>blade-seata-order</module>
<module>blade-seata-storage</module>
<module>blade-easypoi</module>
<module>blade-websocket</module>
</modules>

5
blade-service/pom.xml

@ -29,6 +29,11 @@
<groupId>org.springblade</groupId>
<artifactId>blade-starter-tenant</artifactId>
</dependency>
<dependency>
<groupId>org.springblade</groupId>
<artifactId>blade-scope-api</artifactId>
<version>${bladex.project.version}</version>
</dependency>
</dependencies>
<build>

Loading…
Cancel
Save