简介
RocketMQ已经在公司项目中用了一年多了,基本上没出现过问题,抽空把之前的笔记整理下分享出来
RocketMQ是由阿里捐赠给Apache的一款分布式、队列模型的开源消息中间件,经历了淘宝双十一的洗礼。
github地址
RocketMQ优点如下:
- 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
- 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
- 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
- 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)
其它特性请参考官方介绍: rocketmq特性
1.RocketMQ架构组成部分
1.1 NameServer
NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
1.2 BrokerServer
Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
1.3 Producer
消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
1.4 Consumer
消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
2.NameServer职责描述
namesrv主要功能在rocketmq源码中namesrv模块中RouteInfoManager,集群的状态就保存以下五个变量中
-
topicQueueTable
它存储了所有的topic属性信息,Value是个QueueData队列,队列的长度等于这个Topic数据存储的Master Broker个数,QueueData里存储着Broker的名称,读写queue的数量,同步标识等 -
brokerAddrTable
以BrokerName为索引,相同名称的Broker可能存在多台机器,一个Master和多个Slave,这个结构存储着一个BrokerName对应属性信息,包括所属的Cluster名称,一个Master
Broker和多个Slave Broker的地址 -
clusterAddrTable
存储的是集群中Cluster的信息,结构很简单,就是一个Cluster名称对应一个由BrokerName组成的集合 -
brokerLiveTable
这个结构和BrokerAddrTable有关系,但是内容完全不同,这个结构的key是BrokerAddr,也就对应着一台机器,BrokerAddrTable中的Key是BrokerName,多个机器的BrokerName可以相同,BrokerLiveTable存储的内容是这台Broker机器的实时状态,包括上次更新装的时间戳,NameServer会定期检查这个时间戳,超时没有更新就任务Broker无效,将其重Broker列表中清除 -
filterServerTable
Filter Server是过滤服务器,是RocketMQ的一种服务端过滤方式,一个Broker可用有一个或者多个Filter Server, 这个结构的Key是Broker的地址,Value是和Broker关联的多个Filter Server的地址
2.1 状态维护逻辑
具体逻辑在BrokerHousekeepingService类中,当NameServer和Broker的长链接断开以后会调用onChannelDestory函数,把这个Broker的信息清理出去。
NameServer还有定时检查时间戳的逻辑,Broker向NameServer发送心跳会更新时间戳,当NameServer检查到时间戳长时间没有更新,便会触发清理逻辑
2.2 为何不用Zookeeper
zooKeeper是Apache的一个开源软件,为分布式应用程序提供协调服务,那为什么RocketMQ要自己造轮子,开发集群管理程序呢?答案是ZooKeeper的功能很强大,包括自动Master选举,当RocketMQ的架构设计决定它不需要Master选举等复杂功能,只需要一个轻量级的元数据服务器就足以
2.3 底层通信
RocketMQ的通信相关代码在remoting模块里,是基于Netty实现通信的,先来看看主要类结构,
RemotingClient和RemotingServer继承RemotingService接口,并增加自己特有的方法,具体实现类NettyRemotingClinet和NettyRemotingServer分别实现了RemotingClinet和RemotingServer,而且都继承了NettyRemotingAbstract类,

通过上面的封装,RocketMQ各个模块间的通信,可以通过发送统一格式的自定义消息(RemotingCommand)来完成,各个模块间的通信实现简洁明了,
在NameServerController里注册DefaultRequestProcessor实例调用processRequest方法处理请求
下图是源码中的启动加载流程
首先在NamesrvStartup的main函数中实例化NamesrvController对象,然后执行NamesrvController对象的initialize()方法,在initialize方法中调用registerDefaultProcessor注册DefaultRequestProcessor类。
3.消息队列的核心机制
3.1消息存储和发送
RocketMQ利用java7中的MappedByteBuffer特性,也就是零拷贝技术,提高消息存盘和网络发送时间
RocketMQ消息的存储是由CommitLog和CommitQueue配合完成的,消息真正的物理存储文件是CommitLog,ComsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址,每个topic下的每个Message Queue都有一个对应的ConsumeQueue文件,文件的地址在{user.home}\store\consumequeue{topicName}{queueId}{fileName}


CommitLog存储地址在{user.home}\store\commitlog{fileName],
RocketMQ采取一些机制,尽量向CommitLog中顺序写,但是随机读.
存储机制这样设计的好处
- CommitLog顺序写,可以大大提高写入效率
- 虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加快后续的读取速度。
- ConsumeQueue存储的偏移量信息,此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queue,Message Key,Tag等所有信息,即使ComsumeQueue丢失,也可以通过commitLog完全恢复
3.2 刷盘方式
通过Broker配置文件里的fiushDiskType参数设置的,(异步刷盘)ASYNC_FLUSH和(同步刷盘)SYNC_FLUSH
-
同步刷盘: 发送消息时在返回写成功状态时,消息已经被写入磁盘,具体流程是,消息写入内存的PAGECACHE后,立即通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的显存,返回消息写成功.
-
异步刷盘: 发送消息时在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存的消息累计到达一定程度时,统一触发写磁盘动作,快速写入.
3.3 同步方式
通过Broker配置文件中的brokerRole参数设置同步复制还是异步 ASYNC_MASTER,SYNC_MASTER,SLAVE三个值中的一个 如果一个Broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。
- 同步复制: 发送消息时等Master和Slave均写入成功才反馈给客户端写入成功状态,如果Master出现故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统的吞吐量
- 异步复制: 发送消息时等Master写入成功即可反馈给客户端写入成功状态,系统拥有较低的延迟和较高的吞吐量,但是Master出了故障,有些数据因为没有被写入Slave,有可能会丢失.
4.消息可靠性
4.1 顺序消息
- 全局顺序消息: 要保证全局顺序消息,需要把topic的读写队列数设置为一,然后Product和Consumer的并发设置也要是一
- 部分顺序消息: 要保证部分消息有序,需要发送端和消费端配合处理,在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不能被并发处理,这样才能达到部分有序
4.2 消息重复
对于分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,也就是所谓的 “一次成功”,在鱼和熊掌不可兼得的情况下,RocketMQ选择了确保一定投递,保证消息不丢失,但是可能会造成消息重复
解决方法有以下两种
- 保证消费逻辑的幂等性(多次调用和一次调用效果相同)
- 维护一个已消费消息的记录,消费前查询这个消息是否被消费过
4.3 消息优先级
RocketMQ是一个先进先出的队列,不支持消息级别或者Topic级别的优先级,
业务中简单的优先级需求可以通过间接的方式实现
以下列出三种场景
- 1.当前topic里有太多相似的消息,例如A,B,C,A是优先级高需要处理的,可以把A单独放一个队列中,B和C放入一个,应用程序创建2个Consumer分别订阅不同的topic
- 2.订单处理系统需要接受100家快递门店过来的请求,把这些请求通过Product写入RocketMQ,订单处理程序每天只能处理1万单,如果其中1个门店一天发送了2万单,这样其他的99家门店消息可能被迫等待2万单处理完后才能处理,显然很不公平
解决方法: 创建一个Topic,设置Topic的MessageQueue数量超过100个,Product根据订单的门店号来决定消息写入哪个MessageQueue,DefaultMQPushConsumer默认是采用循环的方式读取一个Topic的所有MessageQueue,这样如果某家订单量大增也不会影响其他门店使用
DefaultMQPushConsumer默认的pullBatchSize是32,也就是每次重某个MessageQueue读取消息的时候,最多读32个,在上面场景中可以把pullBatchSize设置为1 - 3 强优先级场景,上面两种情况对优先级要求不高,更像一个保证公平的机制,如果消息T1>T2>T3这种应用场景,需要用户自己使用pullConsumer控制消息的消费
5.Consumer的负载均衡
5.1 DefaultMQPushConsumer的负载均衡
负载均衡算法有五种,
- 平均分配策略(默认)(AllocateMessageQueueAveragely)
- 环形分配策略(AllocateMessageQueueAveragelyByCircle)
- 手动配置分配策略(AllocateMessageQueueByConfig)
- 机房分配策略(AllocateMessageQueueByMachineRoom)
- 一致性哈希分配策略(AllocateMessageQueueConsistentHash)
默认用第一种A**llocateMessageQueueAveragely,**负载均衡的结果和Topic的Message Queue数量,以及ConsumerGroup里的Consumer的数量有关系,当Message Queue书设置为3,当Consumer数量为2的时候,有一个Consumer需要处理2/3的请求,如果当Consumer数量为4的时候,有一个Consumer不会接受到请求

5.2 DefaultMQPullConsumer的负载均衡
PullConsumer可以看到所有的Message Queue,而且重哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何方式的负载均衡
6.运维工具
6.1 RocketMQ-Console
是一个基于spring boot开发的运维页面工具,我看可以参考它的源码进行自定义运维工具的开发
github地址
6.2 基于Tools模块自定义开发
RocketMQ源码中有一个tools模块,MQAdmin相关命令的实现就在这里,Tools模块源码中有一个command包,里面列出了各个组件相关的命令
参考文章:
书籍: RokcetMQ实战与原理解析
RocketMQ官网文档: https://github.com/apache/rocketmq/tree/master/docs/cn











![[并发编程] - Executor框架#ThreadPoolExecutor源码解读02 [并发编程] - Executor框架#ThreadPoolExecutor源码解读02](https://blog.quwenai.cn/zb_users/upload/2022/03/20220327124158164835611866353.png)

还没有评论,来说两句吧...