RabbitMQ官网提供了七种队列模型,分别是:简单队列、工作队列、发布订阅、路由模式、主题模式、RPC模式、发布者确认模式。
本文在SpringBoot+RabbitMQ环境实现"工作队列"模式。
一、工作队列
特点:一个生产者,多个消费者;
注意:一条消息只能被一个消费者消费,不能被多个消费者重复消费;
二、在SpringBoot中的实现
还是新建2个springboot项目,一个 rabbitmq-provider (生产者),一个rabbitmq-consumer(消费者)。
> pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency> > application.yml文件
spring:
rabbitmq:
host: IP
port: 5672
virtual-host: felix-vHost
username: long.yuan
password: long.yuan > 配置类
@Configuration
public class RabbitConfig {
/**
* @Title 初始化Queue
* @Description 创建一个名称为"felix-queue"的队列,其他参数使用默认
* 在创建队列的时候如果要指定其他参数,Queue有多个构造方法可选择
* @Author long.yuan
* @Date 2020/2/23 22:43
* @Param []
* @return org.springframework.amqp.core.Queue
**/
@Bean
public Queue felixQueue(){
return new Queue("felix-queue");
}
} > 生产者
@Component
public class Publisher {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* @Title 发送消息
* @Description 发送消息
* @Author long.yuan
* @Date 2020/2/23 22:49
* @Param []
* @return void
**/
public void sendMessage(int i) {
String message = "工作队列-message-"+i;
System.out.println("发送消息 : " + message);
rabbitTemplate.convertAndSend("felix-queue",message);
}
} > 新建2个消费者,监控的是同一个队列"felix-queue",其中一个消费者我们在处理逻辑里面加一段sleep的代码,模拟处理耗时的情景
@Component
@RabbitListener(queues = "felix-queue")
public class Consumer {
/**
* @RabbitListener 和 @RabbitHandler 搭配使用
* 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
* 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,
* 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
**/
@RabbitHandler
public void process(String message) {
System.out.println("1号consumer消费成功 : " + message);
}
} @Component
@RabbitListener(queues = "felix-queue")
public class Consumer2 {
/**
* @RabbitListener 和 @RabbitHandler 搭配使用
* 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
* 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,
* 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
**/
@RabbitHandler
public void process(String message) throws InterruptedException {
Thread.sleep(1000);
System.out.println("2号consumer消费成功 : " + message);
}
} > 测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class PublishTest {
@Autowired
Publisher publisher;
@Test
public void sendMessageTest() throws InterruptedException {
for (int i=0;i<20;i++){
publisher.sendMessage(i);
}
}
} 和简单队列一样,在工作队列中,发送消息的时候不指定交换机的名称,那么就会发送到"默认交换机"上。默认的Exchange不进行Binding操作,任何发送到该Exchange的消息都会被转发到"Queue名字和Routing key相同的队列"中,如果vhost中不存在和Routing key同名的队列,则该消息会被抛弃。
这里我们在发送消息的时候设置的Routing key为"felix-queue",那么就会发送到队列名为"felix-queue"的队列上去。
三、测试
首先启动消费者
再运行test方法发送消息,20条消息发送完成
我们再来看消费者,把执行结果贴出来,
我们可以看到,2号消费者每次处理消息的时间比较长(我们用sleep模拟的),1号消费者早早的干完自己的活了,2号消费者后面还在慢慢的干。但是我们看到,两个人处理的数量是一样多的,1号消费偶数,2号消费奇数。
这其实就是RabbitMQ默认的分发机制:轮询分发,默认情况下RabbitMQ会将接收到的消息逐个分发给消费者,并且是一次性分发完,它不等你,它就轮询发,你处理的慢就给你堆在那里自己慢慢去处理。
这显然不是我们想要的,那有没有更好的机制呢?我们给处理的慢的消费者少发点,给处理的快的消费者多发点,这样可以不让消息在消费端造成堆积。有!这就是RabbitMQ的"公平分发"机制!
※ RabbitMQ工作队列的“公平分发”机制
> 只需修改application.yml,将prefetch设置为1
spring:
rabbitmq:
host: IP
port: 5672
virtual-host: felix-vHost
username: long.yuan
password: long.yuan
listener:
simple:
# 公平分发
prefetch: 1 > 然后再启动消费者,运行test发放发送消息,我们再看结果,1号处理的快,所以处理的数量也多,2号处理的慢,只处理了1条消息,不会导致消息在消费端的阻塞,达到了我们想要的效果。
那设置prefetch=1是什么意思呢?它表示限制每个Consumer在同一个时间点最多只能处理一个消息,我手里的活还没干完的话你就不能再给我分了。
感兴趣的小伙伴可以关注一下博主的公众号,1W+技术人的选择,致力于原创技术干货,包含Redis、RabbitMQ、Kafka、SpringBoot、SpringCloud、ELK等热门技术的学习&资料。












还没有评论,来说两句吧...