消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

原文地址:http://www.yunai.me/RocketMQ/message-send-and-receive/?mp

RocketMQ 带注释源码地址 :https://github.com/YunaiV/incubator-rocketmq
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号


 

1、概述

2、Producer 发送消息

  • DefaultMQProducerImpl#tryToFindTopicPublishInfo()
  • MQFaultStrategy
  • DefaultMQProducerImpl#sendKernelImpl()
  • MQFaultStrategy
  • LatencyFaultTolerance
  • LatencyFaultToleranceImpl
  • FaultItem
  • DefaultMQProducer#send(Message)
  • DefaultMQProducerImpl#sendDefaultImpl()

3、Broker 接收消息

  • AbstractSendMessageProcessor#msgCheck
  • SendMessageProcessor#sendMessage
  • DefaultMessageStore#putMessage

4、某种结尾


1、概述

  1. Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。
  2. Broker 接收消息。(存储消息在《RocketMQ 源码分析 —— Message 存储》解析)

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

2、Producer 发送消息

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

DefaultMQProducer#send(Message)

  // .... 省略代码

 

  • 说明:发送同步消息,DefaultMQProducer#send(Message) 对 DefaultMQProducerImpl#send(Message)进行封装。

DefaultMQProducerImpl#sendDefaultImpl()

 // .... 省略代码

 

  • 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
  • 第 1 至 7 行:对sendsendDefaultImpl(...)进行封装。
  • 第 20 行 :invokeID仅仅用于打印日志,无实际的业务用途。
  • 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
  • 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。
  • 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy
  • 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()
  • 第 46 行 :更新Broker可用性信息。在选择发送到的消息队列时,会参考Broker发送消息的延迟,详细解析见:MQFaultStrategy
  • 第 62 至 68 行:当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

 // .... 省略代码

 

  • 说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。
  • 第 3 行 :从缓存topicPublishInfoTable中获得 Topic发布信息。
  • 第 5 至 9 行 :从 Namesrv 中获得 Topic发布信息。
  • 第 13 至 17 行 :当从 Namesrv 无法获取时,使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》。

MQFaultStrategy

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

MQFaultStrategy

 // .... 省略代码

 

  • 说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false
  • 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。
  • 第 64 行 :未开启容错策略选择消息队列逻辑。
  • 第 74 至 79 行 :更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMaxnotAvailableDuration的配置,对应如下:
Producer发送消息消耗时长 Broker不可用时长
>= 15000 ms 600 * 1000 ms
>= 3000 ms 180 * 1000 ms
>= 2000 ms 120 * 1000 ms
>= 1000 ms 60 * 1000 ms
>= 550 ms 30 * 1000 ms
>= 100 ms 0 ms
>= 50 ms 0 ms

LatencyFaultTolerance

  // .... 省略代码

 

  • 说明 :延迟故障容错接口

LatencyFaultToleranceImpl

  // .... 省略代码

 

  • 说明 :延迟故障容错实现。维护每个对象的信息。

FaultItem

 // .... 省略代码

 

  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

DefaultMQProducerImpl#sendKernelImpl()

 // .... 省略代码

 

  • 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker
  • 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》。
  • 第 64 至 121 行 :构建发送消息请求SendMessageRequestHeader
  • 第 107 至 117 行 :执行 MQClientInstance#sendMessage(...) 发起网络请求。

3、Broker 接收消息

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

SendMessageProcessor#sendMessage

 // .... 省略代码

 

  • #processRequest() 说明 :处理消息请求。
  • #sendMessage() 说明 :发送消息,并返回发送消息结果。
  • 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()。
  • 第 60 至 64 行 :消息队列编号小于0时,Broker 可以设置随机选择一个消息队列。
  • 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》。
  • 第 105 至 118 行 :创建MessageExtBrokerInner
  • 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()。
  • 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。
  • 第 186 至 214 行 :发送成功,响应。这里doResponse(ctx, request, response)进行响应,最后return null,原因是:响应给 Producer 可能发生异常,#doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查 Broker 接收消息成功后响应是否存在异常会方便很多。

AbstractSendMessageProcessor#msgCheck

 // .... 省略代码

 

  • 说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。
  • 第 11 至 18 行 :检查Topic是否可以被发送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允许发送。
  • 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说:defaultTopic 的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》。

DefaultMessageStore#putMessage

  // .... 省略代码
  • 说明:存储消息封装,最终存储需要 CommitLog 实现。
  • 第 7 至 27 行 :校验 Broker 是否可以写入。
  • 第 29 至 39 行 :消息格式与大小校验。
  • 第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》

4、某种结尾

感谢阅读、收藏、点赞本文的工程师同学。

阅读源码是件令自己很愉悦的事情,编写源码解析是让自己脑细胞死伤无数的过程,痛并快乐着。

如果有内容写的存在错误,或是不清晰的地方,见笑了,🙂。欢迎加 QQ:7685413 我们一起探讨,共进步。

再次感谢阅读、收藏、点赞本文的工程师同学。