【RocketMQ系列(三)】基于RocketMQ的分布式事务

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
【RocketMQ系列(三)】基于RocketMQ的分布式事务

Java技术分享,点击上方蓝字关注我吧。

【RocketMQ系列(三)】基于RocketMQ的分布式事务

RocketMQ系列第三篇。

前两篇介绍了消息队列及RocketMQ的基本使用,本次来聊一下基于RocketMQ的分布式事务解决方案。

  • Why分布式事务
  • 分布式事务解决方案
  • 基于RocketMQ的分布式事务
  • 代码实现

0x01 为什么有分布式事务

现在很多大公司的项目都拆分为为服务器架构的了,通常每个服务只处理一件事情,部署在一个服务器节点上,不同的服务部署在不同的机器上,这就存在服务之间的相互通信问题。

比如订单服务和支付服务,这里举一个简单的业务流程,创建一个订单之后,向MQ发送消息,支付服务消费消息,调起支付,然后订单服务进行修改订单状态,发货。

【RocketMQ系列(三)】基于RocketMQ的分布式事务

如果用户已经支付完成了,但是在处理订单状态环节出现了问题,该怎么办?这个时候消费者方(支付服务)已经把消息消费了,无法回滚了。

所以这两个服务,从创建订单到支付到更新订单状态等一系列的操作必须是原子性的。

这就是分布式系统中涉及到的分布式事务问题。

0x02 实现最终一致性的解决思路

2.1 两阶段提交(2PC)

Two-phase Commit,简称2PC两阶段提交。从字面意思就能想到,提交事务时分两个阶段来完成最终事务的提交。

【RocketMQ系列(三)】基于RocketMQ的分布式事务

该方案通过引入一个第三方协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。

2.1.1 阶段一:准备阶段

协调者询问参与者事务是否执行成功,参与者发回事务执行结果。

【RocketMQ系列(三)】基于RocketMQ的分布式事务

2.1.2 阶段二:提交阶段

如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。

Tip:在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚

【RocketMQ系列(三)】基于RocketMQ的分布式事务

2.1.3 两阶段提交存在的问题

  • 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。
  • 单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。
  • 数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
  • 太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

2.2 三阶段提交(3PC,TCC,补偿事务)

【RocketMQ系列(三)】基于RocketMQ的分布式事务

Try-Confirm-Cancel,TCC,采用的是补偿机制。其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作

2.2.1 三个阶段

  • Try 主要是对业务系统做检测及资源预留
  • Confirm 主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功
  • Cancel 主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

2.2.2 优缺点

TCC是对2PC的一个改进,try阶段通过预留资源的方式避免了同步阻塞资源的情况

但是TCC编程需要业务自己实现try,confirm,cancel,对业务入侵太大,实现起来也比较复杂

0x03 基于RocketMQ的分布式事务

RocketMQ支持分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致。

3.1 实现方式

Half Message(半消息,预处理消息)

Broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中,它暂时不会被Consumer消费。

检查事务状态

Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向Producer确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

事务消息的三种状态

  • 提交状态:提交事务,它允许消费者消费此消息。
  • 回滚状态:回滚事务,它代表该消息将被删除,不允许被消费。
  • 未知状态:中间状态,它代表需要检查消息队列来确定状态。

消息回查

有一种场景,如果发送预备消息成功,执行本地事务成功,但发送确认消息失败;那么问题就来了,因为Producer的业务都已经处理完毕了,就剩下Consumer消费了,但是你commit失败了,Consumer消费不到,这里就出现了数据不一致。

RocketMQ采用消息状态回查来解决这种问题,RocketMQ会定时遍历commitlog中的预备消息。

【RocketMQ系列(三)】基于RocketMQ的分布式事务

因为预备消息最终肯定会变为Commit消息Rollback消息,所以遍历预备消息去回查本地业务的执行状态,如果发现本地业务没有执行成功就Rollback,如果执行成功就发送Commit消息。

超时

如果超过回查次数,默认回滚消息。

3.2 Show you the code

3.2.1 Producer发送事务性消息

RocketMQ的分布式事务,需要生产者发送事务性消息,使用TransactionMQProducer类创建生产者,并指定唯一的ProducerGroup,就可以设置自定义线程池来处理这些检查请求。

执行本地事务后,需要根据执行结果对消息队列进行回复。

生成TransactionMQProducer实例

TransactionMQProducer producer = new TransactionMQProducer("laopo");
producer.setNamesrvAddr("192.168.2.110:9876");

//处理检查请求的线程池
ExecutorService executorService = new ThreadPoolExecutor(2,
        5,
        100,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

producer.setExecutorService(executorService);

3.2.2 设置监听回查

设置监听事务的接口TransactionListener:当发送半消息成功时,使用executeLocalTransaction方法来执行本地事务,返回前文所述的三种状态之一:提交、回滚、未知。

checkLocalTransaction方法用于检查本地事务状态,并回应消息队列的检查请求,该方法也返回提交、回滚、未知三种状态之一。

//设置回查
producer.setTransactionListener(new TransactionListener() {

    private AtomicInteger transactionIndex = new AtomicInteger(0);
    //用来保存事务的状态
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    //半消息发送成功触发此方法来执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(message.getTransactionId(), status);

        return LocalTransactionState.UNKNOW;
    }

    //broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        Integer status = localTrans.get(messageExt.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

3.2.3 发送消息

调用sendMessageInTransaction来发送消息:

//生产并发送消息
producer.start();
String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};
for (int i = 0; i < 10; i++) {
    Message msg =
            new Message("girl", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);
    Thread.sleep(10);
}
for (int i = 0; i < 100000; i++) {
    Thread.sleep(1000);
}
//关闭生产者实例
producer.shutdown();
System.out.printf("%s""已关闭生产者实例");

运行结果:

【RocketMQ系列(三)】基于RocketMQ的分布式事务

3.2.4 消费事务消息

之前生产消息生产了TagA、B到TagE的消息,我们这里顺便再验证一下TAG过滤消费,就消费TagB的吧:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("laopo-consumer");
consumer.setNamesrvAddr("192.168.2.110:9876");

//订阅topic,消费TagB的消息
consumer.subscribe("girl""TagB");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动Consumer实例
consumer.start();
System.out.println("consumer started.");
【RocketMQ系列(三)】基于RocketMQ的分布式事务

成功消费了TagB的消息。

完整代码位于 GitHub github.com/xblzer/JavaJourney

本次导航结束,以上。


首发公众号 行百里er ,欢迎老铁们关注阅读指正。代码仓库 GitHub github.com/xblzer/JavaJourney

【RocketMQ系列(三)】基于RocketMQ的分布式事务

RocketMQ入坑系列(一)角色介绍及基本使用


【RocketMQ系列(三)】基于RocketMQ的分布式事务

RocketMQ入坑系列(二)近距离感受RocketMQ如何收发消息



原文始发于微信公众号(行百里er):【RocketMQ系列(三)】基于RocketMQ的分布式事务