分布式事务

<谨供参考>

分布式事务顾名思义就是要在分布式系统中实现事务,它其实是由多个本地事务组合而成。

关于分布式事务目前也有许多种解决方案,常说的几种如2pc,3pc,TCC,本地消息表,消息事务,最大努力通知

关于几种方案的介绍可以看下敖丙的文章<分布式事务的六种解决方案>

无论是哪种解决方案,目的都是希望保证多个系统的事务方法统一提交,要么全成,全么全败(原子性),所以这篇文章指在建立这样的想法上,手写一个分布式事务解决方案的demo,以为之后的分布式事务框架以及知识学习做积累

在开发之前需要知道一点,spring的事务管理是基于(jdbc/java.sql.xxx)进行拓展,所以我们可以从这里着手

  • 修改获取Connection的逻辑
  • 设计图如下

图中画了四个角色

  1. 服务调用者(server1)
  2. 服务被调用者(server2)
  3. 全局事务管理者(tx-manager)
  4. 中间协调者(global-tx)-负责和tx-manager交互的

整个调用链路如下:

1
2
3
4
5
6
7
8
9
10
1. 访问server1接口,server1做update操作
2. 在server1做update之前先获取@Xwtransactional的value值,判断它是不是事务组的开始方,如果是,则先创建一个全局事务组,然后将自己加入此事务组中
3. 走XwDataSourceAspect获取到我们自己的XwConnection
4. 调用server2执行update操作
5. 然后再走一遍刚才server1的过程,只不过server2是事务组的结束方,所以它直接去取刚才的那条事务组并加入进去即可(ThreadLocal)
6. server2执行完本地方法后调用XwConnection.commit,要提交的时候先不提交,等TxManager的通知再提交(这里为了不影响后续的操作,使用守护线程wait的方式)
7. XwTransactionManager.addXwTransaction,组装数据发送到tx-manager
8. tx-manager判断是否已经接收到结束事务的标记,比较事务是否已经全部到达,如果已经全部到达则看是否需要回滚,然后发送回global-tx,由global-tx唤醒当前XwTransaction,执行commit/rollback操作
9. server2结束调用,回到server1
10. 执行剩余方法,随后再走一遍server2的过程(6-8)

server1 和server2 的代码很简单,就是两个update数据库的操作

  • tx-manager,负责和中间协调者通信交互 1. 接收并保存每个分布式事务 2.通知每个子事务进行提交或者回滚,这里使用netty进行数据的交互通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* order 4
* 作为事务管理者,它需要:
* 1. 创建并保存事务组
* 2. 保存各个子事务在对应的事务组内
* 3. 统计并判断事务组内的各个子事务状态,以算出当前事务组的状态(提交or回滚)
* 4. 通知各个子事务提交或回滚
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

// 事务组中的事务状态列表
private static Map<String, List<String>> transactionTypeMap = new HashMap<>();
// 事务组是否已经接收到结束的标记
private static Map<String, Boolean> isEndMap = new HashMap<>();
// 事务组中应该有的事务个数
private static Map<String, Integer> transactionCountMap = new HashMap<>();

[@Override](https://my.oschina.net/u/1162528)
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.add(ctx.channel());
}

/**
* 创建事务组,并且添加保存事务
* 并且需要判断,如果所有事务都已经执行了(有结果了,要么回滚,要么提交),且其中有一个事务需要回滚,那么通知所有客户端进行回滚
* 否则,则通知所有客户端进行提交
* [@param](https://my.oschina.net/u/2303379) ctx
* [@param](https://my.oschina.net/u/2303379) msg
* [@throws](https://my.oschina.net/throws) Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收数据:" + msg.toString());
JSONObject jsonObject = JSON.parseObject((String) msg);
String command = jsonObject.getString("command"); // create-创建事务组,add-添加事务
String groupId = jsonObject.getString("groupId"); // 事务组id
String transactionType = jsonObject.getString("transactionType"); // 子事务类型,commit-待提交,rollback-待回滚
Integer transactionCount = jsonObject.getInteger("transactionCount"); // 事务数量
Boolean isEnd = jsonObject.getBoolean("isEnd"); // 是否是结束事务
if ("create".equals(command)) {
// 创建事务组
transactionTypeMap.put(groupId, new ArrayList<>());
} else if ("add".equals(command)) {
// 加入事务组
transactionTypeMap.get(groupId).add(transactionType);
if (isEnd) {
isEndMap.put(groupId, true);
transactionCountMap.put(groupId, transactionCount);
}
JSONObject result = new JSONObject();
result.put("groupId", groupId);
// 如果已经接收到结束事务的标记,比较事务是否已经全部到达,如果已经全部到达则看是否需要回滚
if (isEndMap.get(groupId) && transactionCountMap.get(groupId).equals(transactionTypeMap.get(groupId).size())) {
if (transactionTypeMap.get(groupId).contains("rollback")){
result.put("command", "rollback");
sendResult(result);
} else {
result.put("command", "commit");
sendResult(result);
}
}
}
}
private void sendResult(JSONObject result) {
for (Channel channel : channelGroup) {
System.out.println("发送数据:" + result.toJSONString());
channel.writeAndFlush(result.toJSONString());
}
}
}
  • global-tx的一些核心代码
XwTransactionAspect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Around("@annotation(org.xiaowu.txmanager.annotation.Xwtransactional)")
public void invoke(ProceedingJoinPoint point) {
// 打印出这个注解所对应的方法
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
Xwtransactional lbAnnotation = method.getAnnotation(Xwtransactional.class);
String groupId = "";
if (lbAnnotation.isStart()) {
// 1. 创建一个全局事务
groupId = XwTransactionManager.createXwTransactionGroup();
} else {
// 2. 获取当前事务组
groupId = XwTransactionManager.getCurrentGroupId();
}
// 3. 加入当前事务组
XwTransaction xwTransaction = XwTransactionManager.createXwTransaction(groupId);

try {
// 通过判断point.proceed()是否有异常来判断是commit还是rollback
// spring会开启事务,和XwDataSourceAspect紧密联系,因为随后会走XwDataSourceAspect@Arround获取Connetction
point.proceed();
XwTransactionManager.addXwTransaction(xwTransaction, lbAnnotation.isEnd(), TransactionType.commit);
} catch (Exception e) {
XwTransactionManager.addXwTransaction(xwTransaction, lbAnnotation.isEnd(), TransactionType.rollback);
e.printStackTrace();
} catch (Throwable throwable) {
XwTransactionManager.addXwTransaction(xwTransaction, lbAnnotation.isEnd(), TransactionType.rollback);
throwable.printStackTrace();
}
}
XwTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 创建事务组,并且返回groupId
* @return
*/
public static String createXwTransactionGroup() {
String groupId = UUID.randomUUID().toString();
JSONObject jsonObject = new JSONObject();
jsonObject.put("groupId", groupId);
jsonObject.put("command", "create");
nettyClient.send(jsonObject);
System.out.println("创建事务组");

currentGroupId.set(groupId);
return groupId;
}

/**
* 创建分布式事务
* @param groupId
* @return
*/
public static XwTransaction createXwTransaction(String groupId) {
String transactionId = UUID.randomUUID().toString();
XwTransaction xwTransaction = new XwTransaction(groupId, transactionId);
XW_TRANSACTION_MAP.put(groupId, xwTransaction);
currentXwTransaction.set(xwTransaction);
addTransactionCount();

System.out.println("创建事务");

return xwTransaction;
}

/**
* 组装数据,添加事务到事务组,发送到tx-manager
* @param xwTransaction
* @param isEnd
* @param transactionType
* @return
*/
public static XwTransaction addXwTransaction(XwTransaction xwTransaction, Boolean isEnd, TransactionType transactionType) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("groupId", xwTransaction.getGroupId());
jsonObject.put("transactionId", xwTransaction.getTransactionId());
jsonObject.put("transactionType", transactionType);
jsonObject.put("command", "add");
jsonObject.put("isEnd", isEnd);
jsonObject.put("transactionCount", XwTransactionManager.getTransactionCount());
nettyClient.send(jsonObject);
System.out.println("添加事务");
return xwTransaction;
}
XwConnection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void commit() throws SQLException {
// 要提交的时候先不提交,等TxManager的通知再提交
new Thread(() -> {
try {
System.out.println("commit wait...");
// 在这里进行wait
xwTransaction.getTask().waitTask();
// 被唤醒后判断TransactionType,rollback or commit
if (xwTransaction.getTransactionType().equals(TransactionType.rollback)) {
connection.rollback();
} else {
connection.commit();
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}).start();
}

@Override
public void rollback() throws SQLException {
new Thread(() -> {
try {
System.out.println("rollback wait...");
xwTransaction.getTask().waitTask();
connection.rollback();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}).start();

}

@Override
public void close() throws SQLException {
// spring会在commit/rollback之后close掉,我们上面为了不让commit/rollback过程中的wait影响到主线程,使用子线程执行.所以我们可以手动执行close方法
// connection.close();
}

还有一些小注意的点,比如server1在调用server2时候,为了让server2知道groupid,可以把它放在header里.

源码仓库地址: 分布式事务学习