消息队列中间件 RocketMQ 源码分析 —— Message 存储

>>最全面的Java面试大纲及答案解析(建议收藏)  

原文地址:http://www.yunai.me/RocketMQ/message-store/

(建议使用原文地址阅读:1、阅读体验;2、代码排版混乱因而省略;)

RocketMQ 带注释源码地址 :https://github.com/YunaiV/incubator-rocketmq 
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号


  • 1、概述

  • 2、CommitLog 结构

  • 3、CommitLog 存储消息

    • MappedFile#落盘

    • FlushRealTimeService

    • CommitRealTimeService

    • GroupCommitService

    • CommitLog#putMessage(...)

    • MappedFileQueue#getLastMappedFile(...)

    • MappedFile#appendMessage(...)

    • DefaultAppendMessageCallback#doAppend(...)

    • FlushCommitLogService

  • 结尾

1、概述

本文接《RocketMQ 源码分析 —— Message 发送与接收》;
主要解析 CommitLog 存储消息部分。

2、CommitLog 结构

CommitLogMappedFileQueueMappedFile 的关系如下:

消息队列中间件 RocketMQ 源码分析 —— Message 存储

CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296

CommitLogMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等文件。

  • MappedFileQueue :MappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。

    • 每个 MappedFile 统一文件大小。

    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默认为 1GB。

  • CommitLog :针对 MappedFileQueue 的封装使用。

CommitLog 目前存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。

  2. BLANK :文件不足以存储消息时的空白占位。

CommitLog 存储在 MappedFile的结构:

MESSAGE[1] MESSAGE[2] ... MESSAGE[n - 1] MESSAGE[n] BLANK

MESSAGE 在 CommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 MsgLen 消息总长度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息内容CRC Int 4
4 QueueId 消息队列编号 Int 4
5 Flag flag Int 4
6 QueueOffset 消息队列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的顺序存储位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息时间戳 Long 8
10 BornHost 生效消息的地址+端口 Long 8
11 StoreTimestamp 存储消息时间戳 Long 8
12 StoreHost 存储消息的地址+端口 Long 8
13 ReconsumeTimes 重新消费消息次数 Int 4
14 PreparedTransationOffset Long 8
15 BodyLength + Body 内容长度 + 内容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic长度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段长度 + 拓展字段 Short + Bytes 2 + PropertiesLength

BLANK 在 CommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 maxBlank 空白长度 Int 4
2 MagicCode BLANK_MAGIC_CODE Int 4

3、CommitLog 存储消息

消息队列中间件 RocketMQ 源码分析 —— Message 存储

CommitLog#putMessage(...)

// 省略代码
  • 说明 :存储消息,并返回存储结果。

  • 第 2 行 :设置存储时间等。

  • 第 16 至 36 行 :事务消息相关,暂未了解。

  • 第 45 & 97 行 :获取锁与释放锁。

  • 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。

  • 第 55 至 62 行 :获取 MappedFile,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。

  • 第 65 行 :插入消息到 MappedFile,解析解析见:MappedFile#appendMessage(...)。

  • 第 69 至 80 行 :MappedFile 已满,创建新的,再次插入消息

  • 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。

  • 第 143 至 173 行 :Broker 主从同步。后面的文章会详细解析😈。

MappedFileQueue#getLastMappedFile(...)

// 省略代码
  • 说明 :获取最后一个 MappedFile,若不存在或文件已满,则进行创建。

  • 第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的 createOffset

  • 第 14 行 :计算文件名。从此处我们可
    以得知,MappedFile的文件命名规则:

> fileName[n] = fileName[n - 1] + n * mappedFileSize > fileName[0] = startOffset - (startOffset % this.mappedFileSize)  目前 `CommitLog` 的 `startOffset` 为 0。 
此处有个**疑问**,为什么需要 `(startOffset % this.mappedFileSize)`。例如:  | startOffset  | mappedFileSize | createOffset | | --- | :-- | :-- | | 5 | 1 | 5 | | 5 | 2 | 4 | | 5 | 3 | 3  | | 5 | 4 | 4 | | 5 | > 5 | 0 |  
_如果有知道的同学,麻烦提示下。😈_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 `this.mappedFileSize` 为每个文件大小时,`startOffset` 所在文件的开始`offset`*
  • 第 30 至 35 行 :设置 MappedFile是否是第一个创建的文件。该标识用于 ConsumeQueue 对应的 MappedFile ,详见 ConsumeQueue#fillPreBlank

MappedFile#appendMessage(...)

 // 省略代码
  • 说明 :插入消息到 MappedFile,并返回插入结果。

  • 第 8 行 :获取需要写入的字节缓冲区。为什么会有 writeBuffer != null 的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。

  • 第 9 至 11 行 :设置写入 position,执行写入,更新 wrotePosition(当前写入位置,下次开始写入开始位置)。

DefaultAppendMessageCallback#doAppend(...)

// 省略代码
  • 说明 :插入消息到字节缓冲区。

  • 第 45 行 :计算物理位置。在 CommitLog 的顺序存储位置。

  • 第 47 至 49 行 :计算 CommitLog 里的 offsetMsgId。这里一定要和 msgId 区分开。

计算方式 长度
offsetMsgId Broker存储时生成 Hex(storeHostBytes, wroteOffset) 32
msgId Client发送消息时生成 Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列) 32 《RocketMQ 源码分析 —— Message 基础》
  • 第 51 至 61 行 :获取队列位置(offset)。

  • 第 78 至 95 行 :计算消息总长度。

  • 第 98 至 112 行 :当文件剩余空间不足时,写入 BLANK 占位,返回结果。

  • 第 114 至 161 行 :写入 MESSAGE 。

  • 第 173 行 :更新队列位置(offset)。

FlushCommitLogService

消息队列中间件 RocketMQ 源码分析 —— Message 存储

线程服务 场景 插入消息性能
CommitRealTimeService 异步刷盘 && 开启内存字节缓冲区 第一
FlushRealTimeService 异步刷盘 && 关闭内存字节缓冲区 第二
GroupCommitService 同步刷盘 第三

MappedFile#落盘

方式
方式一 写入内存字节缓冲区(writeBuffer) 从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) 文件通道(fileChannel)flush
方式二 写入映射文件字节缓冲区(mappedByteBuffer) 映射文件字节缓冲区(mappedByteBuffer)flush

消息队列中间件 RocketMQ 源码分析 —— Message 存储

flush相关代码

考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE 才进行 flush

// 省略代码

commit相关代码:

考虑到写入性能,满足 commitLeastPages * OS_PAGE_SIZE 才进行 commit

// 省略代码

FlushRealTimeService

消息插入成功时,异步刷盘时使用。

// 省略代码 
  • 说明:实时 flush 线程服务,调用 MappedFile#flush 相关逻辑。

  • 第 23 至 29 行 :每 flushPhysicQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushCommitLogLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。

  • 第 33 行 至 37 行 :根据 flushCommitLogTimed 参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用 commitLogService.wakeup() 。

  • 第 45 行 :调用 MappedFile 进行 flush

  • 第 61 至 65 行 :Broker 关闭时,强制 flush,避免有未刷盘的数据。

CommitRealTimeService

消息插入成功时,异步刷盘时使用。
和 FlushRealTimeService 类似,性能更好。

// 省略代码

GroupCommitService

消息插入成功时,同步刷盘时使用。

// 省略代码
  • 说明:批量写入线程服务。

  • 第 16 至 25 行 :添加写入请求。方法设置了 sync 的原因:this.requestsWrite 会和 this.requestsRead 不断交换,无法保证稳定的同步。

  • 第 27 至 34 行 :读写队列交换。

  • 第 38 至 60 行 :循环写入队列,进行 flush

    • 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个 MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。

    • 第 51 行 :唤醒等待写入请求线程,通过 CountDownLatch 实现

  • 第 61 至 66 行 :直接刷盘。此处是由于发送的消息的 isWaitStoreMsgOK 未设置成 TRUE ,导致未走批量提交。

  • 第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果 wakeup() 时,则会立即进行一次批量提交。当 Broker 设置成同步落盘 && 消息 isWaitStoreMsgOK=true,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。

结尾

写的第二篇与RocketMQ源码相关的博文,看到有阅读、点赞、收藏甚至订阅,很受鼓舞。

《Message存储》比起《Message发送&接收》从难度上说是更大的,当然也是更有趣的,如果存在理解错误或者表达不清晰,还请大家多多包含。如果可以的话,还请麻烦添加 QQ:7685413 进行指出,避免自己的理解错误,给大家造成困扰。

推荐《Kafka设计解析(六)- Kafka高性能架构之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的说法:高一个喜马拉雅山。😈认真啃读《Linux内核设计与实现(原书第3版)》,day day up。

再次感谢大家的阅读、点赞、收藏。

下一篇:《RocketMQ 源码分析 —— Message 拉取与消费》 起航!

消息队列中间件 RocketMQ 源码分析 —— Message 存储