3万字聊聊什么是RocketMQ(二)

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

大家好,我是Leo。

这是开端的第三次循环了。当前正在正处于RocketMQ基础原理。

3万字聊聊什么是RocketMQ(二)

3万字聊聊什么是RocketMQ(一)

4万字聊聊阿里二面,保证你看不完

聊聊Redis面试题

2万字聊聊什么是秒杀系统(中)

3万字聊聊什么是Redis(完结篇)

3万字聊聊什么是MySQL(初篇)

本章概括

3万字聊聊什么是RocketMQ(二)

分布式事务

由何而来

我们在使用MQ在解决实际业务场景中的问题时,往往伴随诸多问题!比如如下图

3万字聊聊什么是RocketMQ(二)

上述两种可能都会导致数据不一致,在业务系统中是 致命的问题

这个时候我们就要保证事务消息。要不全部成功,要不全部失败。来达到订单服务,购物车服务的数据一致性!

对于购物车服务收到订单创建成功消息清理购物车这个操作来说,失败的处理比较简单,只要成功执行购物车清理后再提交消费确认即可,如果失败,由于没有提交消费确认,消息队列会自动重试。

解决了购物车服务问题,剩下的就是订单服务这边的创建订单,生产消息这两步了。要么全部成功,要么全部失败,不允许一个成功,一个失败的情况。

一旦订单控制不住,购物车那边也是控制不住的! 这就是事务需要解决的问题了!

什么是分布式事务

事务就是为了保证这些数据的完整性和一致性,我们希望这些更新操作要么全部成功,要么全部失败。这就是我们通过对事务的理解。如果严格来说,MQ的事务和MySQL一样,都具有四种属性 ACID

  1. 原子性:一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情 况
  2. 一致性:这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后 读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据
  3. 隔离性:指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对 正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰
  4. 持久性:指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何 影响

对于单体服务来说,都实现了ACID,但是对于分布式系统来说,实现ACID这几乎是不可能的,或者说代价太大。所有目前大家所说的分布式事务,更多的情况下,是一种分布式事务的不完整实现。不同的应用场景中,有不同的实现,目的都是通过一些妥协来解决实际问题。

比较常见的分布式事务有

  • 2PC(Two-phase Commit,也叫二阶段提 交)
  • TCC(Try-Confirm-Cancel)
  • 和事务消息

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。比如我们在开始时提到的那个例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了。

剩下的就不做过多解释了。

MQ是如何实现的

MQ主要借助的是 半消息 实现的,如下图

3万字聊聊什么是RocketMQ(二)

  1. 订单服务首先会开启一个事务,就类似于MySQL那样。
  2. 对MQ生产一个半消息
  3. 以上都没有问题之后,就会执行事务,写入数据库
  4. 提交事务或回滚事务

这里的半消息,并不是只有一半的数据。而是有全部的数据,这里的半只是 在事务提交之前,对于消费者来说,这个消息是不可见的

到了这里,订单服务肯定是没有问题的,所以把数据写入到MQ的Broker之后

这里回顾一下生产端的交互流程,可以参考下列图片,理解

  1. 订单服务会向MQ的Broker发送一个ACK包
  2. 如果Broker确认收到了,会给订单服务回一个ACK+SYN包 (如果Broker没有收到,会开始重传)
  3. 如果Broker收到了,一定可以确保订单服务的数据执行完成,以及确保数据已经到Broker了。

到了这里,订单服务,Broker端是没有问题的,把数据写入Broker之后,购物车服务就会开始进行消费这条消息

这里回顾一下消费端的交互流程,可以参考下列图片,理解

  1. 购物车服务在监听收到消息后进行消费
  2. 当购物车服务执行了当前的逻辑之后,会给Broker发送一个 ACK+SYN包确认消费
  3. 如果购物车服务没有给Broker回复,那么Broker就会开始重发

3万字聊聊什么是RocketMQ(二)

到了这里,订单服务,Broker端,购物车服务基本实现了 要么成功,要么失败 的一致性要求。

天网恢恢疏而不漏,在第四步的时候提交事务,如果失败了怎么办?

Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中 反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿

RocketMQ是如何实现的

这里RocketMQ也给出了相应的应对策略!在事务实现中,他加了 事务反查的机制 来解决事务的提交失败问题。

如果订单服务,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去订单服务上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个机制,我们需要做一个反查本地事务状态的接口,告知RocketMQ本地事务是否成功。

例如 只需要根据消息中的订单ID,检查这个订单是否创建成功即可

这个反查本地事务的实现,并不依赖订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然 可以通过其他订单服务的节点来执行反查,确保事务的完整性

3万字聊聊什么是RocketMQ(二)

确保消息不会丢失

聊到消息一致性,可靠性传输,我们可以从问题的根源入手。我先列举一些容易出问题的故障点

  • 生产阶段:在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
  • 存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • 消费阶段:在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

3万字聊聊什么是RocketMQ(二)

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失

存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

集群我不会,后续再更新。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

消息丢失检测

前期代码健壮性不友好的情况,可以在拦截器里编写日志输出,把消费的id号记录下来。

  • 生产者,生产一条就记录一条
  • 消费者,消费一条就记录一条

这样这样两边对照就可以把丢失的id号 定位出来。也可以通过分布式链路追踪系统 扯远了,以后再说吧

确保消息不被重复消费

为什么会有重复消息

在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能 会产生重复的消息。对使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。

所以重复消费的情况必然存在

在MQTT协议中,大概提供了三种标准

  1. At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什 么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使 用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  2. At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消 息,但是允许有少量重复消息出现。
  3. Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重 复,这个是最高的等级。

大多数的消息队列,都是采用的 At least once: 至少一次

根据上面介绍,我们可以得知 消息队列很难保证消息不重复

既然消息队列,无法保证重复消费的问题,那我们就要在程序里解决这个问题了。

如何解决重复消费(幂等性)

幂等性是一个数学上的概念,它是这样定义的:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。

这里被扩展到计算机领域,被广泛的应用于多次执行产生的影响均与一次执行的影响相同

使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。

(这里可以联想到 用户充值,多次消费充值的话,肯定是有问题的!)

如果说MQ解决不了数据重复消费的问题,那么现在可以转化成 At least once + 幂等性 = Exactly once 这样就可以保证重复消费了。主要有下列三种方法

  • 数据库的唯一约束实现幂等
  • 为更新的数据设置前置条件
  • 记录并检查操作

数据库的唯一约束实现幂等

我先举一个我自己系统的例子:用户在充值账号余额时,会产生一个账单ID。

我们在实现唯一约束的时候就可以重新创建一个表。伪代码如下

create table aaa(
id bigint(15) not null comment '约束id',
user_id bigint(15) not null comment '用户id',
bill_id bigint(15) not null comment '账单id',
money decimal(10,2) not null comment '充值金额',
PRIMARY KEY (`id`) USING BTREE,
KEY `adasdasdas` (`user_id`,`bill_id`), -- 唯一约束 用户di和账单id
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账单约束表';

3万字聊聊什么是RocketMQ(二)

这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。

基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

参考李玥老师的 消息队列高手课 思想

为更新的数据设置前置条件

在更新数据时,我们可以设置一个更新前的值,如下图。

这里可以加一个充值前金额,这里因为我的体量,并发不大,暂时没加,后面我会根据老板的要求再加的。

3万字聊聊什么是RocketMQ(二)

如果有重复订单打过来,那我就可以计算充值前的金额,以及当前的付款金额。来付款来实现幂等性。

也可以通过版本号控制,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

在修改数据记录并检查操作

可以采用Token,UUID的方式实现幂等性。这种方式是通用性比较强的。实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

结尾

有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!

非常欢迎大家加我个人微信有关后端方面的问题我们在群内一起讨论! 我们下期再见!

长按上方扫码二维码,加我微信,拉你进群


原文始发于微信公众号(欢少的成长之路):3万字聊聊什么是RocketMQ(二)