之前在网上看到了一篇关于RocketMQ的事务消息的文章,感觉讲的并不好,甚至有错误的地方,所以就想自己来写一篇文章,讲一讲我对与RocketMQ的事务消息的理解,不一定会正确,各位在看的时候,可以结合自己的思考,看看是否有一定的参考性,这里会单刀直入的讲解重点,而不会讲一些铺垫的东西,例如什么是RocketMQ,什么是事务消息,什么是事务等等,如果你遇到了一些名词,自己不是很理解,需要自己去其它地方学习下。
应用的场景
电商支付场景中,向DB中写入了订单信息,并且发送了1个MQ消息,通知其它系统处理积分、优惠券等服务,DB中的事物还是比较容易的,只要不是分库的,就好操作,相关的内容网上也很多,就不过多说了,现在假设数据库的事物是T1,发送MQ的事务消息是T2,那么伪代码就是如下:
1 | 执行T1(写DB) |
如果你这么写,就会有很大的问题,因为T2有可能会失败,这样T1没有回滚,就出现了问题;也有可能是T2超时了,导致T2实际到底有没有成功,不知道,这个时候也会有问题。这里是1个基本的场景,所以理论上来说,是不能这么写代码的,那么要怎么写呢? RocketMQ给了一个事务消息的选项。
事务消息
Rocket MQ的事务消息,可以保证MQ如果发送成功,DB事务也一定成功,DB失败了,MQ也一定不会成功,这样就解决了上述的问题,怎么做到的呢?接下来,我主要的目的,就是给大家讲清楚,怎么实现DB成功,MQ成功,DB失败,MQ也一定失败的。
首先要实现1个TransactionListener的接口,这个接口有2个方法
1 | /** |
这里要重点理解2个方法的注释,不要觉的是英文的注释,就不想看,从这个注释上看,可以理解,Rocket MQ在假设一种未知的状态,什么状态呢? 就是收不到prepare(half) message的回应,这里引申出来了1个概念,叫做预处理消息,或者叫准备状态的消息,RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执行完send方法后,进入的prepared状态。这里我们要讲一下LocalTransactionState这个类,这个类有3种状态:
- COMMIT_MESSAGE:提交消息,这个消息由prepared状态进入到commited状态,消费者可以消费这个消息
- ROLLBACK_MESSAGE:回滚
- UNKNOW:未知状态
这里的一个场景是,未知状态是什么意思? 先放下,然后看看我们该怎么用这2个方法。这里我主要写伪代码。
这里我们降低难度,假设DB的事务就是1个insert,向订单表插入了1条记录
1 | public class TransactionListenerImpl implements TransactionListener { |
这么做,是怎么实现事务消息的呢?RocketMQ的内部应该是这么做的(我没看源码,不确定具体的代码)
1 | boolean succ = sendMq() //准备状态,消费者看不见该条消息 |
核心思想:
- 先发MQ,再执行DB的事务,再根据DB事务的状态,决定MQ消息是否要给消费者消费,如果DB成功了,MQ通过重试机制,保证prepare(half) message可以变更为commited状态。
- DB的事务,必须在 executeLocalTransaction() 这个方法里写。
- MQ自己内部保证 commitMQ() 或者 rollbackMQ() 一定能成功。
- 如果MQ挂了,再启动的时候,可以再去DB里查数据库事务是否成功,保证消息的最终一致。
这里可以看到,RocketMQ通过先发MQ消息,再执行DB事务,保证了在发送MQ这个环境一旦出现错误,可以通过再次回溯DB,查看DB的事务状态,来判断是提交MQ的事务消息,还是回滚MQ的事务消息,这里其实也有问题,就是一旦MQ执行commitMQ失败了,去回溯DB的时候,恰好DB也挂了,而且MQ重试了几次,DB都没有恢复,这个时候该怎么办呢?大家可以思考下。
看到这里,我们再回过头来看,未知状态是什么?其实就是各种异常情况导致的未知状态,就是上述代码中的A情况、B情况、C情况,可能还会有网络超时、MQ自己挂了、MQ消息超时等异常情况,这里RocketMQ是通过回溯DB查看DB的事务状态来决定MQ的事务状态的,本质上属于2PC,并不是真正的分布式事务。这里分布式事务的难点在于通信信道的不可靠,具体可以看看2军问题,2军问题和拜占庭将军问题是完全不同的2个问题,大家一定要分清楚。
全文完。