xa mysql xid

MySQL 从5.0.3开始支持XA分布式事务,且只有InnoDB存储引擎支持。MySQL Connector/J 从5.0.0版本之后开始直接提供对XA的支持
在DTP模型中,mysql属于资源管理器(RM)。而一个完整的分布式事务中,一般会存在多个RM,由事务管理器TM来统一进行协调。因此,这里所说的mysql对XA分布式事务的支持,一般指的是单台mysql实例如何执行自己的事务分支。
MySQL XA 事务SQL语法
https://dev.mysql.com/doc/refman/5.7/en/xa-statements.html



XA {START|BEGIN} xid [JOIN|RESUME] //开启XA事务,如果使用的是XA START而不是XA BEGIN,那么不支持[JOIN|RESUME],xid是一个唯一值,表示事务分支标识符
XA END xid [SUSPEND [FOR MIGRATE]] //结束一个XA事务,不支持[SUSPEND [FOR MIGRATE]]
XA PREPARE xid 准备提交
XA COMMIT xid [ONE PHASE] //提交,如果使用了ONE PHASE,则表示使用一阶段提交。两阶段提交协议中,如果只有一个RM参与,那么可以优化为一阶段提交
XA ROLLBACK xid //回滚
XA RECOVER [CONVERT XID] //列出所有处于PREPARE阶段的XA事务
下面是一个简单的msyql XA事务案例,演示了mysql作为全局事务中的一个事务分支,将一行记录插入到一个表中
mysql> XA START ‘xatest’; //其中’xatest’就是xid的值
Query OK, 0 rows affected (0.00 sec)



mysql> insert into user(name) values(“tianshozuhi”);
Query OK, 1 row affected (0.00 sec)



mysql> XA END ‘xatest’;
Query OK, 0 rows affected (0.00 sec)



mysql> XA PREPARE ‘xatest’;
Query OK, 0 rows affected (0.01 sec)



mysql> XA COMMIT ‘xatest’;
Query OK, 0 rows affected (0.01 sec)



Mysql XA事务状态
https://dev.mysql.com/doc/refman/5.7/en/xa-states.html



XA事务的状态,按照如下步骤进行展开





  1. 使用XA START来启动一个XA事务,并把它置于ACTIVE状态。




  2. 对于一个ACTIVE状态的 XA事务,我们可以执行构成事务的SQL语句,然后发布一个XA END语句。XA END把事务放入IDLE状态。




  3. 对于一个IDLE 状态XA事务,可以执行一个XA PREPARE语句或一个XA COMMIT…ONE PHASE语句:





XA PREPARE把事务放入PREPARED状态。在此点上的XA RECOVER语句将在其输出中包括事务的xid值,因为XA RECOVER会列出处于PREPARED状态的所有XA事务。



XA COMMIT…ONE PHASE用于预备和提交事务。xid值将不会被XA RECOVER列出,因为事务终止。




  1. 对于一个PREPARED状态的 XA事务,您可以发布一个XA COMMIT语句来提交和终止事务,或者发布XA ROLLBACK来回滚并终止事务。



针对一个给定的客户端连接而言,XA事务和非XA事务(即本地事务)是互斥的。例如,已经执行了”XA START”命令来开启一个XA事务,则本地事务不会被启动,直到XA事务已经被提交或被 回滚为止。相反的,如果已经使用START TRANSACTION启动一个本地事务,则XA语句不能被使用,直到该事务被提交或被 回滚为止。

最后,如果一个XA事务处于ACTIVE状态,是不能直接进行提交的,如果这样做,mysql会抛出异常:


ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed
when global transaction is in the ACTIVE state
3 关于XID的说明
mysql中使用xid来作为一个事务分支的标识符。事实上xid作为事务分支标识符是在XA规范中定义的,在« Distributed Transaction Processing: The XA Specification» 4.2 节中,规定了一个xid的结构,通过C语言进行描述,如下:



/∗
∗ Transaction branch identification: XID and NULLXID:
∗/
#define XIDDATASIZE 128 /∗ size in bytes ∗/
#define MAXGTRIDSIZE 64 /∗ maximum size in bytes of gtrid ∗/
#define MAXBQUALSIZE 64 /∗ maximum size in bytes of bqual ∗/
struct xid_t {
long formatID; /* format identifier */
long gtrid_length; /* value 1-64 */
long bqual_length; /* value 1-64 */
char data[XIDDATASIZE];
};
/∗
∗ A value of -1 in formatID means that the XID is null.
∗/
typedef struct xid_t XID;
/∗
∗ Declarations of routines by which RMs call TMs:
∗/
extern int ax_reg(int, XID ∗, long);
extern int ax_unreg(int, long); XA规范定义了一个xid有4个部分组成:


gtrid:



全局事务标识符(global transaction identifier),最大不能超过64字节


bqual:



分支限定符(branch qualifier),最大不能超过64字节


data:



xid的值,其是 gtrid和bqual拼接后的内容。因为gtrid和bqual最大都是64个字节,因此data的最大长度为128。不过,在xid的结构体中,并没有gtrid和bqual,只有gtrid_length、bqual_length。由于二者的内容都存储在data中,因此我们可以根据data反推出gtrid和bqual。举例来说,假设gtrid为”g12345”(5个字节),bqual为”b456”(4个字节)。那么在构造xid结构体时,gtrid_length=5,bqual_length=4,data=”g12345b456”,那么在反推的时候:



从data[0]到data[gtrid_length-1]之间的部分就是gtrid的值;从data[gtrid_length]到data[gtrid_length+bqual_length-1]部分就是bqual的值。



formatId:



而formatId的作用就是记录gtrid、bqual的格式,类似于memcached中flags字段的作用。XA规范中通过一个结构体约定了xid的组成部分,但是并没有规定data中存储的gtrid、bqual内容到底应该是什么格式。你可以选择使用数字,也可以选择使用字符串,到底选择什么由开发者自行决定,只要最终能保证data中的内容是全局唯一的即可。XA规范建议使用OSI CCR风格来组织xid的内容,此时formatId应该设置为0.


在mysql官方文档中,关于xid的组成也有类似的说明:



xid: gtrid [, bqual [, formatID ]]
其中,bqual、formatID是可选的。解释如下:



gtrid : 是一个全局事务标识符(global transaction identifier),



bqual:是一个分支限定符(branch qualifier),如果没有提供bqual,那么默认值为空字符串’‘。



formatID:是一个数字,用于标记gtrid和bqual值的格式,这是一个无符号整数(unsigned integer),也就是说,最小为0。如果没有提供formatID,那么其默认值为1。



特别需要注意的是,xid作为一个事务分支的标识符,理论上只要有分支限定符(bqual)就可以了,为什么要包含全局事务标识符(gtrid)?这主要是为了管理方便,通过包含进xid,我们可以很容易的判断出这个事务分支属于哪一个全局事务。 

例如,前面提到 XA RECOVER命令的作用是列出所有处于PREPARE阶段的XA事务,以下是一个案例:

mysql> XA RECOVER;
+----------+--------------+--------------+--------------+
| formatID | gtrid_length | bqual_length | data |
+----------+--------------+--------------+--------------+
| 1 | 6 | 6 | g12345b67890 |
+----------+--------------+--------------+--------------+ 这里列出的是一个分支事务xid的组成信息,根据前面的介绍,我们可以推断出:

gtrid是data[0]到data[gtrid_length-1]部分的内容,即data[0]到data[6-1=5]部分的内容,结果为g12345;

而bqual是data[gtrid_length]到data[gtrid_length+bqual_length-1]部分的内容,即data[6]到data[6+6-1=11]部分的内容,结果b67890。


因此,根据这个信息,我们就可以判断出这个xid表示的是:全局事务(g12345)中的事务分支(b67890)。



4、通过jdbc操作mysql xa事务
MySQL Connector/J 从5.0.0版本之后开始直接提供对XA的支持,也就是提供了java版本XA接口的实现。意味着我们可以直接通过java代码来执行mysql xa事务。



需要注意的是,业务开发人员在编写代码时,不应该直接操作这些XA事务操作的接口。因为在DTP模型中,RM上的事务分支的开启、结束、准备、提交、回滚等操作,都应该是由事务管理器TM来统一管理。

由于目前我们还没有接触到TM,那么我们不妨做一回"人肉事务管理器",用你智慧的大脑,来控制多个mysql实例上xa事务分支的执行,提交/回滚。通过直接操作这些接口,你将对xa事务有更深刻的认识。


import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection;
import com.mysql.jdbc.jdbc2.optional.MysqlXid;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MysqlXAConnectionTest {
public static void main(String[] args) throws SQLException {
//true表示打印XA语句,,用于调试
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection(“jdbc:mysql://localhost:3306/test”, “root”, “shxx12151022”);
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection(“jdbc:mysql://localhost:3306/test”, “root”,
“shxx12151022”);
XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = “g12345”.getBytes();
int formatId = 1;
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = “b00001”.getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 执行rm1上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
PreparedStatement ps1 = conn1.prepareStatement(“INSERT into user(name) VALUES (‘tianshouzhi’)”);
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事务分支id
byte[] bqual2 = “b00002”.getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = conn2.prepareStatement(“INSERT into user(name) VALUES (‘wangxiaoxiao’)”);
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2:提交所有事务分支
boolean onePhase = false; //TM判断有2个事务分支,所以不能优化为一阶段提交
if (rm1_prepare == XAResource.XA_OK
&& rm2_prepare == XAResource.XA_OK
) {//所有事务分支都prepare成功,提交所有事务分支
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {//如果有事务分支没有成功,则回滚
rm1.rollback(xid1);
rm1.rollback(xid2);
}
} catch (XAException e) {
// 如果出现异常,也要进行回滚
e.printStackTrace();
}
}
}
在这个案例中,演示了2个RM的情况下分布式事务的工作流程。因为我们充当了”人肉事务管理器”TM,因此很多本应该由TM来处理的工作处理细节也直接体现在上述代码中,如:生成全局事务id和分支事务id、在RM上开启事务分支、两阶段提交等。虽然我们自己作为”人肉事务管理器”是很不可靠的,但是上述代码可以让我们了解一个TM内部的主要工作流程是怎样的。



在实际开发中,代码绝不会像上表面那样复杂,因为我们通常都会使用第三方或者容器提供的TM功能,因此在操作分布式事务时,代码可以得到极大的简化。

最后,由于我们设置了logXaCommands=true,程序在运行的时候回打印出执行的XA命令。如下所示:

Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303031,0x1
Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303031,0x1
Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303032,0x1
Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303032,0x1
Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303031,0x1
Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303032,0x1
Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303031,0x1
Fri Feb 02 18:09:29 CST 2018 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303032,0x1 5 MySQL Connector/J XA事务支持源码简单分析
最后,我们对上述源码进行一下简单的分析。在前面直接使用mysql命令操作的时候,我们通过"XA START xid”等XA命令来执行XA事务。而在上述java代码中,我们是获取了一个普通的链接Connection之后,封装成了MysqlXAConnection。如下:


com.mysql.jdbc.jdbc2.optional.MysqlXAConnection



public class MysqlXAConnection extends MysqlPooledConnection implements XAConnection, XAResource {
private com.mysql.jdbc.Connection underlyingConnection;
private Log log;
protected boolean logXaCommands;



//构造方法
public MysqlXAConnection(com.mysql.jdbc.Connection connection, boolean logXaCommands) throws SQLException {
super(connection);
this.underlyingConnection = connection;
this.log = connection.getLog();
this.logXaCommands = logXaCommands;
}

}
可以看到,MysqlXAConnection本身就实现了XAResource接口,因此当调用getXAResource()方法时,返回的就是其自己



com.mysql.jdbc.jdbc2.optional.MysqlXAConnection#getXAResource



public XAResource getXAResource() throws SQLException {
return this;
}
之后,我们调用XAResource的start方法来开启XA事务。start方法源码如下所示:



com.mysql.jdbc.jdbc2.optional.MysqlXAConnection#start



public void start(Xid xid, int flags) throws XAException {
//1、封装XA命令
StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
commandBuf.append(“XA START “);
appendXid(commandBuf, xid);



//2、添加flag标记
switch (flags) {
case TMJOIN:
commandBuf.append(" JOIN");
break;
case TMRESUME:
commandBuf.append(" RESUME");
break;
case TMNOFLAGS:
// no-op
break;
default:
throw new XAException(XAException.XAER_INVAL);
}

//执行命令
dispatchCommand(commandBuf.toString());
this.underlyingConnection.setInGlobalTx(true); } 可以看到,当我们调用MysqlXAConnection的start方法时,实际上就是执行了一个”XA START xid [JOIN|RESUME]”命令而已,和我们直接在命令行中的操作是一样一样的,只不过通过封装简化了我们的操作。
对于MysqlXAConnection的end、prepare、commit、rollback等方法,也都是是类似的,不再赘述。

最后提示, MySQL Connector/J 中提供的XA操作接口,如上面提到的XAConnection、XAResource、Xid等,实际上都遵循了JTA规范。

mysql8.0文档:https://dev.mysql.com/doc/refman/8.0/en/xa-statements.html。13.3.8.1 XA Transaction SQL Syntax章节讲述了Mysql对于XA事务的语法。










XA {START BEGIN} xid [JOIN RESUME] XA END xid [SUSPEND [FOR MIGRATE]] XA PREPARE xid XA COMMIT xid [ONE PHASE] XA ROLLBACK xid XA RECOVER [CONVERT XID]


首先,根据DTP(Distributed Transaction Processing: Reference Model)参考模型中,Mysql是作为资源管理器这一组件。所以Mysql也仅仅是作为XA规范中的一个组件而已,Mysql对于XA的支持,其实是提供了RMs与TM之间的接口交互支持。TM(Transaction manager)是一个事务的协调者,协调众多的事务参与者。明白了这一点以后,我们再来看Mysql中使用XA事务的语法,mysql官方文档中也有详细的描述,我们在下面列举一二,另外关于mysql支持XA是从什么版本开始,以及java驱动包什么版本支持XA,请见以下文档原文



Support for XA transactions is available for the



InnoDB



storage engine. The MySQL XA implementation is based on the X/Open CAE document Distributed Transaction Processing: The XA Specification. This document is published by The Open Group and available athttp://www.opengroup.org/public/pubs/catalog/c193.htm. Limitations of the current XA implementation are described in Section C.6, “Restrictions on XA Transactions”.



innodb存储引擎支持XA事务



Among the MySQL Connectors, MySQL Connector/J 5.0.0 and higher supports XA directly, by means of a class interface that handles the XA SQL statement interface for you.



5.0.0版本mysql连接驱动开始支持XA



XA事务命令都是XA开头的,xa start 和 xa begin 都可以开启一个xa事务,但是xa start 不支持join 、resume,这两个是什么,我暂时不了解,暂且不管,xa start 还需要跟一个xid,这个是事务的唯一标识,关于xid的构成,下面再详述,这里仅需要知道xid是一个事务的id标识即可。



xa end xid,即完成sql 操作后,让xa事务进入IDLE状态的命令,同样要指明xid,操作的是哪个XA事务,注意这里xa end并不是要结束xa事务,只是进入到IDLE状态,后续还有两阶段提交过程,prepare和commit;



xa prepare xid ,标识两阶段提交的第一个提交阶段,通知资源管理器RM做提交前的准备,防止数据丢失,之前讨论两阶段提交时已经讲了,这个阶段,mysql就会记录下这个事务的各种日志,防止丢失,即使宕机重启也能恢复。prepare结束就具备了这种恢复的能力,RM prepare回复TM,prepare成功后,RM会等TM的commit通知,而TM要等所有RM的成功消息,所有RM回复成功,TM就下发commit给所有RM;如果部分RM回复不成功,那么TM就下发rollback给所有RM回滚事务。



xa rollback xid就是回滚事务的指令,xa commit xid就是提交事务的指令,xa commit xid ONE PHASE 是明确知道RM只有一个的情况下,采用一阶段提交的方式,这种情况下就不需要prepare阶段了,xa end后即可xa commit xid ONE PHASE了。



xa recover ,是用来查看哪些xid已经完成prepare的,异常宕机情况下,xa recover也能列出宕机前哪些xa事务完成prepare,等待commit的。



xid: gtrid [, bqual [, formatID ]]



以上是xid的构成,gtrid全局事务id标识,然后bqual 事务分支标识,formatID是格式标识,具体什么用处暂时不明白。bqual和formatID都是可选,如果不给值时默认值分别为”和1.



gtrid 和 bqual 都必须是字符串类型,长度是64byte,formatID是无符号整型。



我们来看个实际例子



我建立了一个全局事务aaa,两个分支事务bbb和ccc。然后两个分支事务都进入了prepare,从分支事务ccc截图中xa recover可以看出。但是ccc回滚,bbb提交。最开始理解这块的时候,我认为既然一个全局事务,那么怎么能够一个回滚一个提交呢?后来仔细一想,这个过程应该是交给TM来统一的,mysql支持XA并不体现在控制全局事务下所有子事务一致提交,而是提供和TM交互的接口,由TM最终来控制,通知所有子事务提交,或都回滚,而不会通知部分提交、部分回滚。



https://mp.weixin.qq.com/s/KeZId8WScnS-rlc0kedEzw

比如我们现在有两个数据库,mysql3306和mysql3307。这里我们使用docker来创建这两个实例:



mysql3306创建命令



docker run


d
-
p
3306
:
3306



-
v
/
Users
/
yjf
/
Documents
/
workspace
/
mysql
-
docker
/
my3306
.
cnf
:
/etc/
mysql
/
mysql
.
conf
.
d
/
mysqld
.
cnf
-
v
/
Users
/
yjf
/
Documents
/
workspace
/
mysql
-
docker
/
data3306
:
/var/
lib
/
mysql
-
e MYSQL_ROOT_PASSWORD
=
123456




name mysql
-
3307
mysql
:
5.7



msyql3306的配置:



[
mysqld
]



pid


file

=



/var/
run
/
mysqld
/
mysqld
.
pid



socket



/var/
run
/
mysqld
/
mysqld
.
sock



datadir



/var/
lib
/
mysql



server


id
=



1



log_bin


mysql
-
bin



binlog_format


ROW



expire_logs_days



30



mysql3307创建命令



docker run


d
-
p
3307
:
3306



-
v
/
Users
/
yjf
/
Documents
/
workspace
/
mysql
-
docker
/
my3307
.
cnf
:
/etc/
mysql
/
mysql
.
conf
.
d
/
mysqld
.
cnf
-
v
/
Users
/
yjf
/
Documents
/
workspace
/
mysql
-
docker
/
data3307
:
/var/
lib
/
mysql
-
e MYSQL_ROOT_PASSWORD
=
123456




name mysql
-
3307
mysql
:
5.7



msyql3307的配置:



[
mysqld
]



pid


file

=



/var/
run
/
mysqld
/
mysqld
.
pid



socket



/var/
run
/
mysqld
/
mysqld
.
sock



datadir



/var/
lib
/
mysql



server


id
=



2



log_bin


mysql
-
bin



binlog_format


ROW



expire_logs_days



30



在mysql3306中
我们有一个user表



create table user
(



id  int ,

name varchar ( 10 ),

score int


);



insert
into
user values
(
1
,



“foo”
,



10
)



在mysql3307中,我们有一个wallet表。



create table wallet
(



id  int ,

money float


);



insert
into
wallet values
(
1
,



10.1
)



我们可以看到,id为1的用户初始分数(score)为10,而它的钱,在wallet中初始钱(money)为10.1。



现在假设我们有一个操作,需要对这个用户进行操作:每次操作增加分数2,并且增加钱数1.2。



这个操作需要很强的一致性。



思考
两阶段提交
这里是一个分布式事务的概念,我们可以使用2PC的方法进行保证事务



2PC的概念如图所示,引入一个资源协调者的概念,由这个资源协调者进行事务协调。



第一阶段,由这个资源协调者对每个mysql实例调用prepare命令,让所有的mysql实例准备好,如果其中由mysql实例没有准备好,协调者就让所有实例调用rollback命令进行回滚。如果所有mysql都prepare完成,那么就进入第二阶段。



第二阶段,资源协调者让每个mysql实例都调用commit方法,进行提交。



mysql里面也提供了分布式事务的语句XA。



用单个实例的事务行不行
等等,这个两阶段提交和我们的事务感觉也差不多,都是进行一次开始,然后执行,最后commit,mysql为什么还要专门定义一个xa的命令呢?于是我陷入了思考…



思考不如实操,于是我用golang写了一个使用mysql的事务实现的“两阶段提交”:



package
main



import



(



“database/sql”



“fmt”



_  "github.com/go-sql-driver/mysql"


“github.com/pkg/errors”



)



func main
()



{



var
err error



// db1的连接



db1 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3306)/hade1”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db1 . Close ()


// db2的连接



db2 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3307)/hade2”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db2 . Close ()


// 开始前显示



var
score
int



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )


var
money float64



db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )



tx1 , err := db1 . Begin ()


if
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



tx2 ,  err  :=  db2 . Begin ()


if
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



defer func ()


{



if
err
:=
recover
();
err
!=



nil



{



        fmt . Printf ( "%+v\n" ,  err )

fmt . Println ( "=== call rollback ====" )

tx1 . Rollback ()

tx2 . Rollback ()


}



    db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}()



// DML操作



if
_
,
err
=
tx1
.
Exec
(
“update user set score=score+2 where id =1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
tx2
.
Exec
(
“update wallet set money=money+1.2 where id=1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// panic(errors.New(“commit before error”))



// commit



fmt . Println ( "=== call commit ====" )

err = tx1 . Commit ()


if
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// panic(errors.New(“commit db2 before error”))



err  =  tx2 . Commit ()


if
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}



我这里已经非常小心地在defer中recover错误信息,并且执行了rollback命令。



如果我在commit命令之前的任意一个地方调用了 panic(errors.New(“commit before error”)) 那么命令就会进入到了rollback这里,就会把两个实例的事务都进行回滚。



通过结果我们可以看到,分数和钱数都没有改变。这个是ok的。



但是如果我在db2的commit之前触发了panic,那么这个命令进入到了rollback中,但是db1已经commit了,db2还没有commit,这个时候会出现什么情况?



非常可惜,我们看到了这里的score增长了,但是money没有增长,这个就说明无法做到事务一致性了。



回到mysql的xa
那么还要回归到2PC,mysql为2PC的实现增加了xa命令,那么使用这个命令我们能不能避免这个问题呢?



同样,我用golang写了一个使用xa命令的代码



package
main



import



(



“database/sql”



“fmt”



“strconv”



“time”



_  "github.com/go-sql-driver/mysql"


“github.com/pkg/errors”



)



func main
()



{



var
err error



// db1的连接



db1 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3306)/hade1”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db1 . Close ()


// db2的连接



db2 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3307)/hade2”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db2 . Close ()


// 开始前显示



var
score
int



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )


var
money float64



db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


// 生成xid



xid  :=  strconv . FormatInt ( time . Now (). Unix (),


10
)



fmt . Println ( "=== xid:"


+
xid
+



” ====”
)



defer func ()


{



if
err
:=
recover
();
err
!=



nil



{



        fmt . Printf ( "%+v\n" ,  err )

fmt . Println ( "=== call rollback ====" )

db1 . Exec ( fmt . Sprintf ( "XA ROLLBACK '%s'" , xid ))

db2 . Exec ( fmt . Sprintf ( "XA ROLLBACK '%s'" , xid ))


}



    db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}()



// XA 启动



fmt . Println ( "=== call start ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA START ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA START ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// DML操作



if
_
,
err
=
db1
.
Exec
(
“update user set score=score+2 where id =1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
“update wallet set money=money+1.2 where id=1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// XA end



fmt . Println ( "=== call end ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA END ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA END ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// prepare



fmt . Println ( "=== call prepare ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA PREPARE ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// panic(errors.New(“db2 prepare error”))



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA PREPARE ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// commit



fmt . Println ( "=== call commit ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA COMMIT ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// panic(errors.New(“db2 commit error”))



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA COMMIT ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}



首先看成功的情况:



一切完美。



如果我们在prepare阶段抛出panic,那么结果如下:



证明在第一阶段出现异常是可以回滚的。



但是如果我们在commit阶段抛出panic:



我们发现,这里的分数增加了,但是money却没有增加。



那么这个xa和单个事务有什么区别呢?我又陷入了深深的沉思…



xa的用法不对
经过在技术群(全栈神盾局)请教,讨论之后,发现这里对2pc的两个阶段理解还没到位,这里之所以分为两个阶段,是强调的是每个阶段都会持久化,就是第一个阶段完成了之后,每个mysql实例就把第一个阶段的请求实例化了,这个时候不管是mysql实例停止了还是其他问题,每次重启的时候都会重新回复这个commit。



我们把这个代码的rollback去掉,假设commit必须成功。



package
main



import



(



“database/sql”



“fmt”



“strconv”



“time”



_  "github.com/go-sql-driver/mysql"


“github.com/pkg/errors”



)



func main
()



{



var
err error



// db1的连接



db1 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3306)/hade1”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db1 . Close ()


// db2的连接



db2 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3307)/hade2”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db2 . Close ()


// 开始前显示



var
score
int



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )


var
money float64



db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


// 生成xid



xid  :=  strconv . FormatInt ( time . Now (). Unix (),


10
)



fmt . Println ( "=== xid:"


+
xid
+



” ====”
)



defer func ()


{



if
err
:=
recover
();
err
!=



nil



{



        fmt . Printf ( "%+v\n" ,  err )

fmt . Println ( "=== call rollback ====" )


// db1.Exec(fmt.Sprintf(“XA ROLLBACK ‘%s’”, xid))



// db2.Exec(fmt.Sprintf(“XA ROLLBACK ‘%s’”, xid))



}



    db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}()



// XA 启动



fmt . Println ( "=== call start ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA START ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA START ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// DML操作



if
_
,
err
=
db1
.
Exec
(
“update user set score=score+2 where id =1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
“update wallet set money=money+1.2 where id=1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// XA end



fmt . Println ( "=== call end ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA END ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA END ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// prepare



fmt . Println ( "=== call prepare ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA PREPARE ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// panic(errors.New(“db2 prepare error”))



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA PREPARE ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// commit



fmt . Println ( "=== call commit ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA COMMIT ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



panic ( errors . New ( "db2 commit error" ))


if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA COMMIT ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}



这个时候,我们停掉程序(停掉mysql的链接),使用 xa recover可以发现,db2的xa事务还留在db2中了。



我们在控制台直接调用 xa commit’1585644880’ 还能继续把这个xa事务进行提交。



这下money就进行了提交,又恢复了一致性。



所以呢,我琢磨了一下,我们写xa的代码应该如下:



package
main



import



(



“database/sql”



“fmt”



“log”



“strconv”



“time”



_  "github.com/go-sql-driver/mysql"


“github.com/pkg/errors”



)



func main
()



{



var
err error



// db1的连接



db1 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3306)/hade1”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db1 . Close ()


// db2的连接



db2 ,  err  :=  sql . Open ( "mysql" ,


“root:123456@tcp(127.0.0.1:3307)/hade2”
)



if
err
!=



nil



{



    panic ( err . Error ())


}



defer db2 . Close ()


// 开始前显示



var
score
int



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )


var
money float64



db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


// 生成xid



xid  :=  strconv . FormatInt ( time . Now (). Unix (),


10
)



fmt . Println ( "=== xid:"


+
xid
+



” ====”
)



defer func ()


{



if
err
:=
recover
();
err
!=



nil



{



        fmt . Printf ( "%+v\n" ,  err )

fmt . Println ( "=== call rollback ====" )

db1 . Exec ( fmt . Sprintf ( "XA ROLLBACK '%s'" , xid ))

db2 . Exec ( fmt . Sprintf ( "XA ROLLBACK '%s'" , xid ))


}



    db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}()



// XA 启动



fmt . Println ( "=== call start ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA START ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA START ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// DML操作



if
_
,
err
=
db1
.
Exec
(
“update user set score=score+2 where id =1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
“update wallet set money=money+1.2 where id=1”
);
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// XA end



fmt . Println ( "=== call end ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA END ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA END ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// prepare



fmt . Println ( "=== call prepare ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA PREPARE ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// panic(errors.New(“db2 prepare error”))



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA PREPARE ‘%s’”
,
xid
));
err
!=



nil



{



    panic ( errors . WithStack ( err ))


}



// commit



fmt . Println ( "=== call commit ====" )


if
_
,
err
=
db1
.
Exec
(
fmt
.
Sprintf
(
“XA COMMIT ‘%s’”
,
xid
));
err
!=



nil



{



// TODO: 尝试重新提交COMMIT



// TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交



    log . Println ( "xid:"


+
xid
)



}



// panic(errors.New(“db2 commit error”))



if
_
,
err
=
db2
.
Exec
(
fmt
.
Sprintf
(
“XA COMMIT ‘%s’”
,
xid
));
err
!=



nil



{



    log . Println ( "xid:"


+
xid
)



}



db1 . QueryRow ( "select score from user where id = 1" ). Scan (& score )

fmt . Println ( "user1 score:" , score )

db2 . QueryRow ( "select money from wallet where id = 1" ). Scan (& money )

fmt . Println ( "wallet1 money:" , money )


}



就是第二阶段的commit,我们必须设定它一定会“成功”,如果有不成功的情况,那么就需要记录下不成功的xid,有一个数据恢复逻辑,重新commit这个xid。来保证最终一致性。



binlog
其实我们使用binlog也能看出一些端倪



这里的mysql-bin.0003替换成为你当前的log



SHOW BINLOG EVENTS
in



‘mysql-bin.000003’
;



XA的binlog



|
mysql
-
bin
.
000003










1967










Anonymous_Gtid










1










2032



|
SET
@
@SESSION
.
GTID_NEXT
=



‘ANONYMOUS’










|
mysql
-
bin
.
000003










2032










Query










1










2138



|
XA START X
‘31353835363338363233’
,
X
‘’
,
1










|
mysql
-
bin
.
000003










2138










Table_map










1










2190



|
table_id
:



108



(
hade1
.
user
)










|
mysql
-
bin
.
000003










2190










Update_rows










1










2252



|
table_id
:



108
flags
:
STMT_END_F

|



|
mysql
-
bin
.
000003










2252










Query










1










2356



|
XA
END
X
‘31353835363338363233’
,
X
‘’
,
1










|
mysql
-
bin
.
000003










2356



|
XA_prepare

|



1










2402



|
XA PREPARE X
‘31353835363338363233’
,
X
‘’
,
1










|
mysql
-
bin
.
000003










2402










Anonymous_Gtid










1










2467



|
SET
@
@SESSION
.
GTID_NEXT
=



‘ANONYMOUS’










|
mysql
-
bin
.
000003










2467










Query










1










2574



|
XA COMMIT X
‘31353835363338363233’
,
X
‘’
,
1



非xa的事务



|
mysql
-
bin
.
000003










2574










Anonymous_Gtid










1










2639



|
SET
@
@SESSION
.
GTID_NEXT
=



‘ANONYMOUS’










|
mysql
-
bin
.
000003










2639










Query










1










2712










BEGIN










|
mysql
-
bin
.
000003










2712










Table_map










1










2764



|
table_id
:



108



(
hade1
.
user
)










|
mysql
-
bin
.
000003










2764










Update_rows










1










2826



|
table_id
:



108
flags
:
STMT_END_F

|



|
mysql
-
bin
.
000003










2826










Xid










1










2857



|
COMMIT
/* xid=67 */



我们很明显可以看到两阶段提交中是有两个GTID的,生成一个GTID就代表内部生成一个事务,所以第一个阶段prepare结束之后,第二个阶段commit的时候就持久化了第一个阶段的内容,并且生成了第二个事务。当commit失败的时候,最多就是第二个事务丢失,第一个事务实际上已经保存起来了了(只是还没commit)。



而非xa的事务,只有一个GTID,在commit之前任意一个阶段出现问题,整个事务就全部丢失,无法找回了。所以这就是mysql xa命令的机制。



总结
看了一些资料,原来mysql从5.7之后才真正实现了两阶段的xa。当然这个两阶段方式在真实的工程中的使用其实很少的,xa的第一定律是避免使用xa。工程中会有很多方式来避免这种分库的事务情况。



不过,不妨碍掌握了mysql的xa,在一些特定的场合,我们也能完美解决问题


Category storage