RabbitMQ官网提供了七种队列模型,分别是:简单队列、工作队列、发布订阅、路由模式、主题模式、RPC模式、发布者确认模式。
本文在SpringBoot+RabbitMQ环境实现"主题模式"。
一、主题模式
特点:
1、X代表交换机,生产者没有将消息直接发送到队列,而是先发送到了交换机。
2、主题模式的交换机类型为topic。
3、一个队列可以有多个消费者,但发送到队列的消息只能被其中一个消费。如下图,发送到队列的消息只会被C1或C2其中一个消费。
二、topic交换机
订阅模式、路由模式、主题模式,它们三者的队列结构是一样的,区别就在于"交换机类型的不同",交换机类型决定了工作模式的特点。订阅模式的交换机类型是fanout,路由模式的交换机类型是direct,主题模式的交换机类型是topic,所以学习RabbitMQ的各种工作模式,掌握各类型交换机的工作特点很重要,下面我们就来看一下topic交换机的特点,
topic交换机是通过模式匹配来路由消息到队列。topic 模式中Routing key必须具有固定的格式:以 . 间隔的一串单词,比如:quick.orange.rabbit,最多不能超过255byte。
交换机和队列之间的Binding key用通配符来表示,有两种语法:
* 可以替代一个单词
# 可以替代 0 或多个单词
上图中,Q1 与交换机的Binding kye 为:"*.orange.*",当消息的Routing key为三个单词,且中间的单词为 orange 时,消息将进入 Q1。Q2 与 exchange的Binding key 为 "rabbit.#",当消息的Routing key以 rabbit 开头时,消息将进入 Q2 。
三、在SpringBoot中的实现
下面我们就在SpringBoot中实现主题模式,新建2个项目,一个 rabbitmq-provider (生产者),一个rabbitmq-consumer(消费者)。
> 两个项目的pom依赖和yml配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.4.RELEASE</version>
</dependency> spring:
rabbitmq:
host: xxx.xxx.xx.xxx
port: 5672
virtual-host: felix-vHost
username: long.yuan
password: long.yuan > 创建两个队列、一个Topic类型的交换机,将两个队列绑定在Topic交换机上,绑定的时候指定BindingKey,指定的BindingKey分别为:"*.hello.#"、"*.hi"
@Configuration
public class RabbitConfig {
/**
* @Title 创建2个队列
* @Description 创建2个队列
* @Author long.yuan
* @Date 2020/2/29 14:43
* @Param []
* @return org.springframework.amqp.core.Queue
**/
@Bean
public Queue felixQueueA(){
return new Queue("felix-queue-A");
}
@Bean
public Queue felixQueueB(){
return new Queue("felix-queue-B");
}
/**
* @Title 声明一个Topic类型的交换机
* @Description 声明一个Topic类型的交换机
* @Author long.yuan
* @Date 2020/2/29 14:44
* @Param []
* @return org.springframework.amqp.core.DirectExchange
**/
@Bean
TopicExchange topicExchange(){
return new TopicExchange("felix-topic-exchange");
}
/**
* @Title 将上面的2个队列绑定到Topic交换机
* @Description 在绑定的时候指定BindingKey(通配符)
* @Author long.yuan
* @Date 2020/2/29 14:50
* @Param [felixQueueA, directExchange]
* @return org.springframework.amqp.core.Binding
**/
@Bean
Binding bindExchangeA(Queue felixQueueA,TopicExchange topicExchange){
return BindingBuilder.bind(felixQueueA).to(topicExchange).with("*.hello.#");
}
@Bean
Binding bindExchangeB(Queue felixQueueB,TopicExchange topicExchange){
return BindingBuilder.bind(felixQueueB).to(topicExchange).with("*.hi");
}
} > 生产者发送两条消息,RoutingKey分别为:"my.hello.rabbit.kafka"、"my.hi"
@Component
public class Publisher {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* @Title 生产者将消息发送到交换机
* @Description 生产者将消息发送到交换机
* @Author long.yuan
* @Date 2020/2/29 13:29
* @Param [i]
* @return void
**/
public void sendMessage() {
String message1 = "主题模式-message-routingKey-my.hello.rabbit.kafka";
String message2 = "主题模式-message-routingKey-my.hi";
System.out.println("发送message1 : " + message1);
rabbitTemplate.convertAndSend("felix-topic-exchange","my.hello.rabbit.kafka",message1);
System.out.println("发送message2 : " + message2);
rabbitTemplate.convertAndSend("felix-topic-exchange","rabbit.hi",message2);
}
} > 消费者监听两个队列
@Component
public class Consumer {
@RabbitListener(queues = "felix-queue-A")
@RabbitHandler
public void processA(String message) {
System.out.println("消息路由到了队列A: " + message);
}
@RabbitListener(queues = "felix-queue-B")
@RabbitHandler
public void processB1(String message) {
System.out.println("消息路由到了队列B: " + message);
}
} 四、测试
运行test方法发送消息,启动消费者,看结果
可以看到,按照通配符的模糊匹配规则,消息被路由到了不同的队列。
主题模式的交换机类型是Topic,Topic交换机的特点,就决定了主题模式的工作模式,即topic交通过模式匹配来路由消息到队列,* 代表一个单词,#代表0或多个单词。
感兴趣的小伙伴可以关注一下博主的公众号,1W+技术人的选择,致力于原创技术干货,包含Redis、RabbitMQ、Kafka、SpringBoot、SpringCloud、ELK等热门技术的学习&资料。













还没有评论,来说两句吧...