分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

本文主要基于 RocketMQ 4.0.x 正式版

  • 1、概述

  • 2、ConsumeQueue 结构

  • 3、ConsumeQueue 存储

    • DefaultMessageStore#doDispatch(...)

    • ConsumeQueue#putMessagePositionInfoWrapper(...)

    • ReputMessageService

    • FlushConsumeQueueService

  • 4、Broker 提供[拉取消息]接口

    • PullMessageRequestHeader

    • PullMessageProcessor#processRequest(...)

    • MessageStore#getMessage(...)

    • DefaultMessageFilter#isMessageMatched(...)

    • PullRequestHoldService

    • PullMessageProcessor#executeRequestWhenWakeup(...)

  • 5、Broker 提供[更新消费进度]接口

    • MixAll#string2File(...)

    • BrokerController#initialize(...)

    • ConfigManager

    • ConsumerOffsetManager

  • 6、Broker 提供[发回消息]接口

    • SendMessageProcessor#consumerSendMsgBack(...)

  • 7、结尾


1、概述

本章主要解析 消费 逻辑涉及到的源码。 因为篇幅较长,分成上下两篇:

  1. 上篇: Broker 相关源码。

  2. 下篇: Consumer 相关源码。

本文即是上篇。


ok,先看第一张关于消费逻辑的图:

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

再看消费逻辑精简的顺序图(实际情况会略有差别):

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

2、ConsumeQueue 结构

ConsumeQueueMappedFileQueueMappedFile 的关系如下:

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)ConsumeQueue : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

  1. Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd

  2. /Users/yunai/store/consumequeue

  3. Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/

  4. Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls

  5. total 0

  6. 0 drwxr-xr-x  3 yunai  staff  102  4 27 21:52 0

  7. 0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 1

  8. 0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 2

  9. 0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 3

  10. Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/

  11. Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls

  12. total 11720

  13. 11720 -rw-r--r--  1 yunai  staff  6000000  4 27 21:55 00000000000000000000


ConsumeQueueMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000等文件。

  • MappedFileQueue : MappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。

    • 每个 MappedFile 统一文件大小。

    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 ConsumeQueue 里默认为 6000000B。

  • ConsumeQueue :针对 MappedFileQueue 的封装使用。

    • Store:ConsumeQueue=ConcurrentHashMap<String/* topic */,ConcurrentHashMap<Integer/* queueId */,ConsumeQueue>>

ConsumeQueue 存储在 MappedFile 的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE ),有两种内容类型:

  1. MESSAGE_POSITION_INFO :消息位置信息。

  2. BLANK : 文件前置空白占位。当历史 Message 被删除时,需要用 BLANK占位被删除的消息。

MESSAGE_POSITION_INFOConsumeQueue 存储结构:

第几位 字段 说明 数据类型 字节数
1 offset 消息 CommitLog 存储位置 Long 8
2 size 消息长度 Int 4
3 tagsCode 消息tagsCode Long 8

BLANKConsumeQueue 存储结构:

第几位 字段 说明 数据类型 字节数
1 0 Long 8
2 Integer.MAX_VALUE Int 4
3 0 Long 8

3、ConsumeQueue 存储

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

主要有两个组件:

  • ReputMessageService :write ConsumeQueue。

  • FlushConsumeQueueService :flush ConsumeQueue。

ReputMessageService

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

  1.  1: class ReputMessageService extends ServiceThread {

  2.  2:

  3.  3:     /**

  4.  4:      * 开始重放消息的CommitLog物理位置

  5.  5:      */

  6.  6:     private volatile long reputFromOffset = 0;

  7.  7:

  8.  8:     public long getReputFromOffset() {

  9.  9:         return reputFromOffset;

  10. 10:     }

  11. 11:

  12. 12:     public void setReputFromOffset(long reputFromOffset) {

  13. 13:         this.reputFromOffset = reputFromOffset;

  14. 14:     }

  15. 15:

  16. 16:     @Override

  17. 17:     public void shutdown() {

  18. 18:         for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {

  19. 19:             try {

  20. 20:                 Thread.sleep(100);

  21. 21:             } catch (InterruptedException ignored) {

  22. 22:             }

  23. 23:         }

  24. 24:

  25. 25:         if (this.isCommitLogAvailable()) {

  26. 26:             log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",

  27. 27:                 DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);

  28. 28:         }

  29. 29:

  30. 30:         super.shutdown();

  31. 31:     }

  32. 32:

  33. 33:     /**

  34. 34:      * 剩余需要重放消息字节数

  35. 35:      *

  36. 36:      * @return 字节数

  37. 37:      */

  38. 38:     public long behind() {

  39. 39:         return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;

  40. 40:     }

  41. 41:

  42. 42:     /**

  43. 43:      * 是否commitLog需要重放消息

  44. 44:      *

  45. 45:      * @return 是否

  46. 46:      */

  47. 47:     private boolean isCommitLogAvailable() {

  48. 48:         return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();

  49. 49:     }

  50. 50:

  51. 51:     private void doReput() {

  52. 52:         for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

  53. 53:

  54. 54:             // TODO 疑问:这个是啥

  55. 55:             if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //

  56. 56:                 && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {

  57. 57:                 break;

  58. 58:             }

  59. 59:

  60. 60:             // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer

  61. 61:             SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

  62. 62:             if (result != null) {

  63. 63:                 try {

  64. 64:                     this.reputFromOffset = result.getStartOffset();

  65. 65:

  66. 66:                     // 遍历MappedByteBuffer

  67. 67:                     for (int readSize = 0; readSize < result.getSize() && doNext; ) {

  68. 68:                         // 生成重放消息重放调度请求

  69. 69:                         DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

  70. 70:                         int size = dispatchRequest.getMsgSize(); // 消息长度

  71. 71:                         // 根据请求的结果处理

  72. 72:                         if (dispatchRequest.isSuccess()) { // 读取成功

  73. 73:                             if (size > 0) { // 读取Message

  74. 74:                                 DefaultMessageStore.this.doDispatch(dispatchRequest);

  75. 75:                                 // 通知有新消息

  76. 76:                                 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()

  77. 77:                                     && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {

  78. 78:                                     DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),

  79. 79:                                         dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,

  80. 80:                                         dispatchRequest.getTagsCode());

  81. 81:                                 }

  82. 82:                                 // FIXED BUG By shijia

  83. 83:                                 this.reputFromOffset += size;

  84. 84:                                 readSize += size;

  85. 85:                                 // 统计

  86. 86:                                 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {

  87. 87:                                     DefaultMessageStore.this.storeStatsService

  88. 88:                                         .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();

  89. 89:                                     DefaultMessageStore.this.storeStatsService

  90. 90:                                         .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())

  91. 91:                                         .addAndGet(dispatchRequest.getMsgSize());

  92. 92:                                 }

  93. 93:                             } else if (size == 0) { // 读取到MappedFile文件尾

  94. 94:                                 this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);

  95. 95:                                 readSize = result.getSize();

  96. 96:                             }

  97. 97:                         } else if (!dispatchRequest.isSuccess()) { // 读取失败

  98. 98:                             if (size > 0) { // 读取到Message却不是Message

  99. 99:                                 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);

  100. 100:                                 this.reputFromOffset += size;

  101. 101:                             } else { // 读取到Blank却不是Blank

  102. 102:                                 doNext = false;

  103. 103:                                 if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {

  104. 104:                                     log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",

  105. 105:                                         this.reputFromOffset);

  106. 106:

  107. 107:                                     this.reputFromOffset += result.getSize() - readSize;

  108. 108:                                 }

  109. 109:                             }

  110. 110:                         }

  111. 111:                     }

  112. 112:                 } finally {

  113. 113:                     result.release();

  114. 114:                 }

  115. 115:             } else {

  116. 116:                 doNext = false;

  117. 117:             }

  118. 118:         }

  119. 119:     }

  120. 120:

  121. 121:     @Override

  122. 122:     public void run() {

  123. 123:         DefaultMessageStore.log.info(this.getServiceName() + " service started");

  124. 124:

  125. 125:         while (!this.isStopped()) {

  126. 126:             try {

  127. 127:                 Thread.sleep(1);

  128. 128:                 this.doReput();

  129. 129:             } catch (Exception e) {

  130. 130:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);

  131. 131:             }

  132. 132:         }

  133. 133:

  134. 134:         DefaultMessageStore.log.info(this.getServiceName() + " service end");

  135. 135:     }

  136. 136:

  137. 137:     @Override

  138. 138:     public String getServiceName() {

  139. 139:         return ReputMessageService.class.getSimpleName();

  140. 140:     }

  141. 141:

  142. 142: }

  • 说明:重放消息线程服务。

    • 该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)

    • 该服务不断生成 消息索引 到 索引文件(IndexFile)

  • 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

    • 第 75 至 81 行 :当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。 NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(...)方法,详细解析见:PullRequestHoldService

    • 第 61 行 :获取 reputFromOffset 开始的 CommitLog 对应的 MappedFile 对应的 MappedByteBuffer

    • 第 67 行 :遍历 MappedByteBuffer

    • 第 69 行 :生成重放消息重放调度请求 ( DispatchRequest) 。请求里主要包含一条消息 ( Message) 或者 文件尾 ( BLANK) 的基本信息。

    • 第 72 至 96 行 :请求是有效请求,进行逻辑处理。

    • 第 73 至 92 行 :请求对应的是 Message,进行调度,生成 ConsumeQueue 和 IndexFile 对应的内容。详细解析见:

    • 第 93 至 96 行 :请求对应的是 Blank,即文件尾,跳转指向下一个 MappedFile

    • 第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG

  • 第 127 至 128 行 :每 1ms 循环执行重放逻辑。

  • 第 18 至 30 行 : shutdown时,多次 sleep(100) 直到 CommitLog 回放到最新位置。恩,如果未回放完,会输出警告日志。

DefaultMessageStore#doDispatch(...)

  1.  1: /**

  2.  2:  * 执行调度请求

  3.  3:  * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue

  4.  4:  * 2. 建立 索引信息 到 IndexFile

  5.  5:  *

  6.  6:  * @param req 调度请求

  7.  7:  */

  8.  8: public void doDispatch(DispatchRequest req) {

  9.  9:     // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue

  10. 10:     final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());

  11. 11:     switch (tranType) {

  12. 12:         case MessageSysFlag.TRANSACTION_NOT_TYPE:

  13. 13:         case MessageSysFlag.TRANSACTION_COMMIT_TYPE:

  14. 14:             DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),

  15. 15:                 req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());

  16. 16:             break;

  17. 17:         case MessageSysFlag.TRANSACTION_PREPARED_TYPE:

  18. 18:         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:

  19. 19:             break;

  20. 20:     }

  21. 21:     // 建立 索引信息 到 IndexFile

  22. 22:     if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {

  23. 23:         DefaultMessageStore.this.indexService.buildIndex(req);

  24. 24:     }

  25. 25: }

  26. 26:

  27. 27: /**

  28. 28:  * 建立 消息位置信息 到 ConsumeQueue

  29. 29:  *

  30. 30:  * @param topic 主题

  31. 31:  * @param queueId 队列编号

  32. 32:  * @param offset commitLog存储位置

  33. 33:  * @param size 消息长度

  34. 34:  * @param tagsCode 消息tagsCode

  35. 35:  * @param storeTimestamp 存储时间

  36. 36:  * @param logicOffset 队列位置

  37. 37:  */

  38. 38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,

  39. 39:     long logicOffset) {

  40. 40:     ConsumeQueue cq = this.findConsumeQueue(topic, queueId);

  41. 41:     cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);

  42. 42: }

ConsumeQueue#putMessagePositionInfoWrapper(...)

  1.  1: /**

  2.  2:  * 添加位置信息封装

  3.  3:  *

  4.  4:  * @param offset commitLog存储位置

  5.  5:  * @param size 消息长度

  6.  6:  * @param tagsCode 消息tagsCode

  7.  7:  * @param storeTimestamp 消息存储时间

  8.  8:  * @param logicOffset 队列位置

  9.  9:  */

  10. 10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,

  11. 11:     long logicOffset) {

  12. 12:     final int maxRetries = 30;

  13. 13:     boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();

  14. 14:     // 多次循环写,直到成功

  15. 15:     for (int i = 0; i < maxRetries && canWrite; i++) {

  16. 16:         // 调用添加位置信息

  17. 17:         boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);

  18. 18:         if (result) {

  19. 19:             // 添加成功,使用消息存储时间 作为 存储check point。

  20. 20:             this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);

  21. 21:             return;

  22. 22:         } else {

  23. 23:             // XXX: warn and notify me

  24. 24:             log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset

  25. 25:                 + " failed, retry " + i + " times");

  26. 26:

  27. 27:             try {

  28. 28:                 Thread.sleep(1000);

  29. 29:             } catch (InterruptedException e) {

  30. 30:                 log.warn("", e);

  31. 31:             }

  32. 32:         }

  33. 33:     }

  34. 34:

  35. 35:     // XXX: warn and notify me 设置异常不可写入

  36. 36:     log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);

  37. 37:     this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();

  38. 38: }

  39. 39:

  40. 40: /**

  41. 41:  * 添加位置信息,并返回添加是否成功

  42. 42:  *

  43. 43:  * @param offset commitLog存储位置

  44. 44:  * @param size 消息长度

  45. 45:  * @param tagsCode 消息tagsCode

  46. 46:  * @param cqOffset 队列位置

  47. 47:  * @return 是否成功

  48. 48:  */

  49. 49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,

  50. 50:     final long cqOffset) {

  51. 51:     // 如果已经重放过,直接返回成功

  52. 52:     if (offset <= this.maxPhysicOffset) {

  53. 53:         return true;

  54. 54:     }

  55. 55:     // 写入位置信息到byteBuffer

  56. 56:     this.byteBufferIndex.flip();

  57. 57:     this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);

  58. 58:     this.byteBufferIndex.putLong(offset);

  59. 59:     this.byteBufferIndex.putInt(size);

  60. 60:     this.byteBufferIndex.putLong(tagsCode);

  61. 61:     // 计算consumeQueue存储位置,并获得对应的MappedFile

  62. 62:     final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

  63. 63:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

  64. 64:     if (mappedFile != null) {

  65. 65:         // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位

  66. 66:         if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。

  67. 67:             this.minLogicOffset = expectLogicOffset;

  68. 68:             this.mappedFileQueue.setFlushedWhere(expectLogicOffset);

  69. 69:             this.mappedFileQueue.setCommittedWhere(expectLogicOffset);

  70. 70:             this.fillPreBlank(mappedFile, expectLogicOffset);

  71. 71:             log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "

  72. 72:                 + mappedFile.getWrotePosition());

  73. 73:         }

  74. 74:         // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?

  75. 75:         if (cqOffset != 0) {

  76. 76:             long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

  77. 77:             if (expectLogicOffset != currentLogicOffset) {

  78. 78:                 LOG_ERROR.warn(

  79. 79:                     "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",

  80. 80:                     expectLogicOffset,

  81. 81:                     currentLogicOffset,

  82. 82:                     this.topic,

  83. 83:                     this.queueId,

  84. 84:                     expectLogicOffset - currentLogicOffset

  85. 85:                 );

  86. 86:             }

  87. 87:         }

  88. 88:         // 设置commitLog重放消息到ConsumeQueue位置。

  89. 89:         this.maxPhysicOffset = offset;

  90. 90:         // 插入mappedFile

  91. 91:         return mappedFile.appendMessage(this.byteBufferIndex.array());

  92. 92:     }

  93. 93:     return false;

  94. 94: }

  95. 95:

  96. 96: /**

  97. 97:  * 填充前置空白占位

  98. 98:  *

  99. 99:  * @param mappedFile MappedFile

  100. 100:  * @param untilWhere consumeQueue存储位置

  101. 101:  */

  102. 102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {

  103. 103:     // 写入前置空白占位到byteBuffer

  104. 104:     ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);

  105. 105:     byteBuffer.putLong(0L);

  106. 106:     byteBuffer.putInt(Integer.MAX_VALUE);

  107. 107:     byteBuffer.putLong(0L);

  108. 108:     // 循环填空

  109. 109:     int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());

  110. 110:     for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {

  111. 111:         mappedFile.appendMessage(byteBuffer.array());

  112. 112:     }

  113. 113: }

  • #putMessagePositionInfoWrapper(...) 说明 :添加位置信息到 ConsumeQueue 的封装,实际需要调用 #putMessagePositionInfo(...) 方法。

    • 第 13 行 :判断 ConsumeQueue 是否允许写入。当发生Bug时,不允许写入。

    • 第 17 行 :调用 #putMessagePositionInfo(...) 方法,添加位置信息。

    • 第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。 StoreCheckpoint 的详细解析见:Store初始化与关闭。

    • 第 22 至 32 行 :添加失败,目前基本可以认为是BUG。

    • 第 35 至 37 行 :写入失败时,标记 ConsumeQueue 写入异常,不允许继续写入。

  • #putMessagePositionInfo(...) 说明 :添加位置信息到 ConsumeQueue,并返回添加是否成功。

    • 这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???

    • 第 51 至 54 行 :如果 offset(存储位置) 小于等于 maxPhysicOffsetCommitLog 消息重放到 ConsumeQueue 最大的 CommitLog 存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。

    • 第 55 至 60 行 :写 位置信息到byteBuffer。

    • 第 62 至 63 行 :计算 ConsumeQueue存储位置,并获得对应的MappedFile。

    • 第 65 至 73 行 :当 MappedFile 是 ConsumeQueue 当前第一个文件 && MappedFile 未写入内容 && 重放消息队列位置大于0,则需要进行 MappedFile 填充前置 BLANK

    • 这块比较有疑问,什么场景下会需要。猜测产生的原因:一个 Topic 长期无消息产生,突然N天后进行发送, Topic 对应的历史消息以及和消费队列数据已经被清理,新生成的 MappedFile需要前置占位。

    • 第 74 至 87 行 :校验 ConsumeQueue 存储位置是否合法,不合法则输出日志。

    • 第 89 行 :设置 CommitLog 重放消息到 ConsumeQueue 的最大位置。

    • 第 91 行 :插入消息位置到 MappedFile

FlushConsumeQueueService

  1.  1: class FlushConsumeQueueService extends ServiceThread {

  2.  2:     private static final int RETRY_TIMES_OVER = 3;

  3.  3:     /**

  4.  4:      * 最后flush时间戳

  5.  5:      */

  6.  6:     private long lastFlushTimestamp = 0;

  7.  7:

  8.  8:     private void doFlush(int retryTimes) {

  9.  9:         int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();

  10. 10:

  11. 11:         // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。

  12. 12:         if (retryTimes == RETRY_TIMES_OVER) {

  13. 13:             flushConsumeQueueLeastPages = 0;

  14. 14:         }

  15. 15:         // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush

  16. 16:         long logicsMsgTimestamp = 0;

  17. 17:         int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();

  18. 18:         long currentTimeMillis = System.currentTimeMillis();

  19. 19:         if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {

  20. 20:             this.lastFlushTimestamp = currentTimeMillis;

  21. 21:             flushConsumeQueueLeastPages = 0;

  22. 22:             logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();

  23. 23:         }

  24. 24:         // flush消费队列

  25. 25:         ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

  26. 26:         for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {

  27. 27:             for (ConsumeQueue cq : maps.values()) {

  28. 28:                 boolean result = false;

  29. 29:                 for (int i = 0; i < retryTimes && !result; i++) {

  30. 30:                     result = cq.flush(flushConsumeQueueLeastPages);

  31. 31:                 }

  32. 32:             }

  33. 33:         }

  34. 34:         // flush 存储 check point

  35. 35:         if (0 == flushConsumeQueueLeastPages) {

  36. 36:             if (logicsMsgTimestamp > 0) {

  37. 37:                 DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);

  38. 38:             }

  39. 39:             DefaultMessageStore.this.getStoreCheckpoint().flush();

  40. 40:         }

  41. 41:     }

  42. 42:

  43. 43:     public void run() {

  44. 44:         DefaultMessageStore.log.info(this.getServiceName() + " service started");

  45. 45:

  46. 46:         while (!this.isStopped()) {

  47. 47:             try {

  48. 48:                 int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();

  49. 49:                 this.waitForRunning(interval);

  50. 50:                 this.doFlush(1);

  51. 51:             } catch (Exception e) {

  52. 52:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);

  53. 53:             }

  54. 54:         }

  55. 55:

  56. 56:         this.doFlush(RETRY_TIMES_OVER);

  57. 57:

  58. 58:         DefaultMessageStore.log.info(this.getServiceName() + " service end");

  59. 59:     }

  60. 60:

  61. 61:     @Override

  62. 62:     public String getServiceName() {

  63. 63:         return FlushConsumeQueueService.class.getSimpleName();

  64. 64:     }

  65. 65:

  66. 66:     @Override

  67. 67:     public long getJointime() {

  68. 68:         return 1000 * 60;

  69. 69:     }

  70. 70: }

  • 说明 :flush ConsumeQueue(消费队列) 线程服务。

  • 第 11 至 14 行 :当 retryTimes==RETRY_TIMES_OVER 时,进行强制flush。用于 shutdown 时。

  • 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。

  • 第 24 至 33 行 :flush ConsumeQueue(消费队列)。

    • flush 逻辑:MappedFile#落盘。

  • 第 34 至 40 行 :flush StoreCheckpoint。 StoreCheckpoint 的详细解析见:Store初始化与关闭。

  • 第 43 至 59 行 :每 1000ms 执行一次 flush。如果 wakeup() 时,则会立即进行一次 flush。目前,暂时不存在 wakeup() 的调用。

4、Broker 提供[拉取消息]接口

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

5、Broker 提供[更新消费进度]接口

  1. Yunai-MacdeMacBook-Pro-2:config yunai$ pwd

  2. /Users/yunai/store/config

  3. Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls

  4. total 40

  5. 8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json

  6. 8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json.bak

  7. 8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json

  8. 8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json.bak

  9. 8 -rw-r--r--  1 yunai  staff  1401  4 27 21:51 topics.json

  10. Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json

  11. {

  12.    "offsetTable":{

  13.        "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0

  14.        },

  15.        "TopicRead3@please_rename_unique_group_name_4":{1:5

  16.        }

  17.    }

  18. }

  • consumerOffset.json :消费进度存储文件。

  • consumerOffset.json.bak :消费进度存储文件备份。

  • 每次写入 consumerOffset.json,将原内容备份到 consumerOffset.json.bak。实现见:MixAll#string2File(...)。

BrokerController#initialize(...)

  1.  1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  2.  2:    @Override

  3.  3:    public void run() {

  4.  4:        try {

  5.  5:            BrokerController.this.consumerOffsetManager.persist();

  6.  6:        } catch (Throwable e) {

  7.  7:            log.error("schedule persist consumerOffset error.", e);

  8.  8:        }

  9.  9:    }

  10. 10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

  • 说明 :每 5s 执行一次持久化逻辑。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

6、Broker 提供[发回消息]接口

大部分逻辑和 Broker 提供[接收消息]接口 类似,可以先看下相关内容。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

7、结尾

感谢同学们对本文的阅读、收藏、点赞。

😈如果解析存在问题或者表达误解的,表示抱歉。如果方便的话,可以加下 QQ:7685413。让我们来一场 1 :1 交流(搞基)。

再次表示十分感谢!

发表评论