问题点
同样,在介绍消息发送流程之前,先抛出几个问题
- 消息发送有哪几种方式?
- producer是如何查找、维护topic对应的路由信息的?
消息发送的实现类为DefaultMQProducer
消息发送方式
先看下DefaultMQProducer中有哪些发送方法
大体可分为三种
- send(Message)为同步发送
- send(Message,SendCallback)为异步发送,SendCallback是发送成功后的异步回调函数
- sendOneway(Message,MessageQueue)为单向发送,
这三种发送方式又有什么区别呢?
- 同步发送需要发送端同步等待MQ服务器响应,保证了消息发送的可靠性,因为是同步等待,所以耗时较多
- 异步发送消息后,发送端不再阻塞等待响应,可以处理、发送其他消息,等待服务端消息确认后异步回调发送端设置的回调接口,耗时较小,消息可靠性也有保证
- 单向发送,不在乎消息发送的可靠性,发送端只管发消息,不在乎消息是否到达broker端,也不在乎响应结果,适合一些对可靠性没有要求的场景,比如日志收集,耗时最小
看到这里,开头的问题1也有了答案
消息发送基本流程
消息发送的实现方法在org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl中,看下大致代码
- 步骤一:消息参数校验,校验了哪些参数呢?

判断消息是否为空
判断topic是否合法
判断消息体是否为空
判断消息长度是否在4M以内
-
步骤二:消息路由信息获取
一个topic的消息队列有可能分布在不同broker上,所以producer端需要查询并维护topic的队列路由信息 -
步骤三:消息队列负载及消息发送
消息队列获取成功后,producer向哪个消息队列发送消息呢?所以需要对消息队列进行选择
选择完后进行消息发送
下面详细介绍,路由信息获取相关代码
路由信息获取
topic对应路由信息获取的方法为tryToFindTopicPublishInfo,其返回值是TopicPublishInfo,先看下其属性
在看下具体的路由获取方法tryToFindTopicPublishInfo,先尝试用当前topic去获取路由信息,获取不到时再用默认的topic(TBW102)去获取路由信息
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)是路由信息获取的核心方法,具体实现如下
详细看下如果路由信息有改变时的更新路由方法,具体更新了什么内容
其中核心是发送端消息队列的维护,和消费端消息队列的维护,发送端的消息队列维护时,需要将Queuedata根据其中写队列数量writeQueueNums转换成消息发送队列MessageQueue,代码在topicRouteData2TopicPublishInfo中,大致思路就是找到Queuedata对应broker(master节点并且可写)信息,并根据writeQueueNums转换成Messagequeue
再看下消费队列Messagequeue的转换方法topicRouteData2TopicSubscribeInfo
到这里,topic路由获取流程基本完成,问题二也有了答案,再来梳理下路由获取的基本流程
- 使用当前topic去nameserver获取路由信息,获取不到时,使用默认的topic(“tbw102”)去获取
- 获取到路由信息后,根据QueueData队列元数据中的writeQueueNums、readQueueNums组织成生产者对应的消息发送队列信息,消费者对应的消息消费队列信息,并维护在本地缓存中
路由信息获取完成,大家可以想到,一个topic可能存在多个MessageQueue,分布在不同的broker上,那么消息发送时,具体选择哪个MessageQueue进行消息发送呢?下篇文章详细介绍消息队列选择及容错相关实现









还没有评论,来说两句吧...