结合seata和2PC,简单聊聊seata源码

news/2024/10/16 18:17:38 标签: java, 分布式

当前代码分析基于seata1.6.1

整体描述

整体代码流程可以描述为

  1. TM开启全局事务,会调用TC来获取XID。
  2. TC在接收到通知后,会生成XID,然后会将当前全局事务保存到global_table表中,并且返回XID。
  3. 在获取到XID后,会执行业务逻辑。
  4. 执行业务逻辑的时候,如果发生了增删改,则会对增删改语句做增强。
  5. 获取前置镜像数据---执行sql,不提交事务--获取后置镜像---准备undoLog---作为RM向TC提交事务分支---生成undo_log日志---提交本地事务,注意,在这里,本地事务已经提交了。只是有undo_log可用于回滚。
  6. TC接收RM端提交的分支事务,存储到brand_table中。
  7. 当全局分支事务都执行完成,TM会向TC提起全局事务提交的请求。
  8. TC接收到请求后,删除全局事务和分支事务(global_table 和 brand_table)。
  9. TC 通知RM,删除 undo log 日志。

源码解析

系统启动初始化

主要完成两个事情:初始化 TM和RM客户端;创建方法拦截器

在客户端中,核心配置类是SeataAutoConfiguration,在这个类中初始化了一个核心的扫描器GlobalTransactionScanner。

GlobalTransactionScanner 全局事务扫描器,实现了InitializingBean接口,如果继承了该接口,spring会在完成DI之后,调用afterPropertiesSet方法,在该方法中完成了对TM客户端和RM客户端的创建,代码如下

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                (ConfigurationChangeListener)this);
        return;
    }
    if (initialized.compareAndSet(false, true)) {
        //创建客户端的方法
        initClient();
    }
}


private void initClient() {
    ...其他校验
    //创建TM客户端并且初始化
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    ...
    //创建RM客户端并且初始化
    RMClient.init(applicationId, txServiceGroup);
    ...

}

同时,GlobalTransactionScanner 继承了AbstractAutoProxyCreator 抽象类,在类完成初始化之后,会调用父类的 postProcessAfterInitialization方法,在父类的方法中,会调用该类重写的一个wrapIfNecessary方法。

wrapIfNecessary 方法会生成一个 GlobalTransactionalInterceptor 全局事务拦截器。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // do checkers
        if (!doCheckers(bean, beanName)) {
            return bean;
        }

        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    ...
                } else {
                    ...
                    //生成一个全局事务处理的拦截器,
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

               ...
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

TM端开启全局事务

开启全局事务

GlobalTransactionalInterceptor中的逻辑

忽略中间调用过程,最终会走到io.seata.tm.api.TransactionalTemplate#execute

在该类中,核心代码如下

public Object execute(TransactionalExecutor business) throws Throwable {
    ...
    try {
        ...
        try {
            //开启全局事务
            beginTransaction(txInfo, tx);
            Object rs;
            try {
                //进入业务代码,执行业务逻辑
                rs = business.execute();
            } catch (Throwable ex) {
                //当出现业务逻辑异常进行回滚
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }
            //所有分支事务无异常,提交全局事务
            commitTransaction(tx, txInfo);

            return rs;
        } finally {
            ...
        }
    } finally {
        ...
    }
}

开启全局事务的方法就在beginTransaction中,继续往下会去TC中获取一个XID,就是全局事务id

TC端接收全局事务请求后

记录全局事务

在server层的代码中,全局事务的入口方法为 io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

在该方法中会去调用core.begin方法,进入xid获取流程

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
    //开始获取xid
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    }
}

忽略其他流程,关注核心,其调用往下的链路为

core.begin--session.begin--lifecycleListener.onBegin--找到子类方法--this.addGlobalSession--找子类方法

在这里,可以找到三个实现类,分别是代表了数据库、文件和redis实现,文件是默认实现,其他几种都需要进行配置,我们针对数据库实现进行描述。

方法路径为:io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

然后调用方法transactionStoreManager.writeSession,transactionStoreManager是一个接口,同样有三种实现

在writeSession方法中,会插入全局事务表数据,代码如下

/**
 * 插入全局事务表
 * 表为:global_table
 * @param globalTransactionDO the global transaction do
 * @return
 */
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        int index = 1;
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        //插入xid
        ps.setString(index++, globalTransactionDO.getXid());
        //插入事务id
        ps.setLong(index++, globalTransactionDO.getTransactionId());
        //插入事务状态,begin  = 1
        ps.setInt(index++, globalTransactionDO.getStatus());
        //插入应用id,一般是服务名
        ps.setString(index++, globalTransactionDO.getApplicationId());
        //插入事务组
        ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
        String transactionName = globalTransactionDO.getTransactionName();
        transactionName = transactionName.length() > transactionNameColumnSize ?
        transactionName.substring(0, transactionNameColumnSize) :
        transactionName;
        //插入事务名称
        ps.setString(index++, transactionName);
        //插入超时时间
        ps.setInt(index++, globalTransactionDO.getTimeout());
        //插入事务开始时间
        ps.setLong(index++, globalTransactionDO.getBeginTime());
        ps.setString(index++, globalTransactionDO.getApplicationData());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
}

RM执行业务代码,并且提交事务

代理数据源,生成undo log,并且通知TC

在seata中,需要配置数据源代理,这个代理会在执行增删改查的时候,对操作进行增强

这里核心需要关注的方法是io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute

@Override
public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    //一般在初始状态下,这个autoCommit是true
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {

        return executeAutoCommitFalse(args);
    }
}

然后会调用 executeAutoCommitTrue 方法,该方法主要做了几个事情,分别是:获取前置镜像和后置镜像,并且制作undo_log;执行目标sql和插入undo_log;作为RM和TC进行交互,提交分支事务;以及提交事务。代码如下

executeAutoCommitTrue 方法代码

 protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        //设置提交方式为手动提交
        connectionProxy.changeAutoCommit();
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            //执行sql,并且准备前置镜像和后置镜像
            T result = executeAutoCommitFalse(args);
            //提交本地事务(内部会和RM进行交互)
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
       ...异常处理
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}

executeAutoCommitFalse方法代码

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    //获取前置镜像
    TableRecords beforeImage = beforeImage();
    //执行目标sql,注意,这边执行完后,事务是未提交的
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    //获取后置镜像
    TableRecords afterImage = afterImage(beforeImage);
    //准备undo_log
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

processGlobalTransactionCommit 方法,该方法就是connectionProxy.commit()最终指向的方法

 private void processGlobalTransactionCommit() throws SQLException {
    try {
        //作为RM,向TC发起请求,注册分支事务,会插入数据到TC的mysql表中
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        //生成undo_log日志,用于事务回滚
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        //提交undo_log 回滚日志和本地事务,事务在这里已经提交了
        targetConnection.commit();
    } catch (Throwable ex) {
        ...异常处理
    }
    ...其他处理
}

TM提交全局事务

进行提交就是向TC发起请求,相关代码如下

@Override
public void commit() throws TransactionException {
    //判断当前角色,只有TM才能执行
    if (role == GlobalTransactionRole.Participant) {
        ...
        return;
    }
    //XID不能为空
    assertXIDNotNull();
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {
        //可重试的执行,最多可执行5次
        while (retry > 0) {
            try {
                retry--;
                //向tc发起调用
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                ...
            }
        }
    } finally {
        ...
    }
    ...
}

TC处理全局事务

TC在接收到提交请求后,会由方法 io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit 进行处理。

    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        //设置状态为异步提交状态
        response.setGlobalStatus(core.commit(request.getXid()));
    }

在AT模式下,事务的提交为异步的方式

public GlobalStatus commit(String xid) throws TransactionException {
        //获取全局session,不同模式获取方式不同,如果是db,则会从数据库获取
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            //如果获取不到session,返回已完成状态,一般在调用超时的时候会发生,这样也可以保证幂等
            return GlobalStatus.Finished;
        }
        ...
        boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                // Highlight: Firstly, close the session, then no more branch can be registered.
                globalSession.closeAndClean();
                //判断是否可以异步提交,AT模式下可以异步提交
                if (globalSession.canBeCommittedAsync()) {
                    //AT模式下异步事务提交
                    globalSession.asyncCommit();
                    MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                    return false;
                } else {
                    ...
                }
            }
            return false;
        });
    ...
}

最终将事务状态设置为异步提交

    public void asyncCommit() throws TransactionException {
        this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
        //设置事务状态为异步提交,这里在设置为异步提交后就不管了
        this.setStatus(GlobalStatus.AsyncCommitting);
        SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
    }

TC异步执行全局事务commit

核心逻辑为 io.seata.server.coordinator.DefaultCoordinator#init

该方法会异步的去进行处理,每秒执行一次

    /**
     * Init.
     */
    public void init() {
        ...
        //异步处理的部分
        //会从global中,每次取100条进行处理,并且删除这100条数据,然后遍历brand_table,根据global_table取删除
        //操作完了之后,向RM进行通知,进行undo_log的删除
        asyncCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
            ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        ...
       
    }

未完待续...

以下是经过注释的源码地址:seata: Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务 - Gitee.com


http://www.niftyadmin.cn/n/5708325.html

相关文章

搜维尔科技:SenseGlove Nova 2触觉反馈手套开箱测评

SenseGlove Nova 2触觉反馈手套开箱测评 搜维尔科技&#xff1a;SenseGlove Nova 2触觉反馈手套开箱测评

2013 lost connection to MySQL server during query

1.问题 使用navicat连接doris&#xff0c;会有这个错误。 2.解决 换低版本的navicat比如navicat11。

【如何获取股票数据03】Python、Java等多种主流语言实例演示获取股票行情api接口之沪深A股实时最新分时MACD数据获取实例演示及接口API说明文档

最近一两年内&#xff0c;股票量化分析逐渐成为热门话题。而从事这一领域工作的第一步&#xff0c;就是获取全面且准确的股票数据。因为无论是实时交易数据、历史交易记录、财务数据还是基本面信息&#xff0c;这些数据都是我们进行量化分析时不可或缺的宝贵资源。我们的主要任…

手撕数据结构 —— 栈(C语言讲解)

目录 1.认识栈 什么是栈 栈的示意图 2.如何实现栈 3.栈的实现 Stack.h中接口总览 具体实现 结构的定义 初始化栈 销毁栈 入栈 出栈 取栈顶元素 获取有效元素的个数 判断栈是否为空 4.完整代码附录 Stack.h Stack.c 1.认识栈 什么是栈 栈是一种特殊的线性表…

利用vmware在移动硬盘安装Ubuntu2go

安装 买个移动硬盘&#xff0c;usb插电脑&#xff0c;磁盘管理看磁盘序列号 vmware新建虚拟机 这一步选择磁盘管理里面看到的磁盘4 先不要开机&#xff0c;选择设置里面UEFI 和安装正常Ubuntu一致操作即可&#xff0c;这里可以不选高级&#xff0c;默认一个引导分区&…

Python 实现电话号码和Email地址提取程序

Python 实现电话号码和Email地址提取程序 背景 在日常工作或学习中&#xff0c;我们经常需要从网页或文档中提取信息&#xff0c;比如电话号码和E-mail地址。手动查找和提取这些信息可能会耗费大量时间&#xff0c;而自动化工具可以帮助我们快速完成这个任务。 本篇博客将带…

前端开发攻略---8种方法实现在浏览器中跨页面通信

目录 1、BroadCast Channel 2、Service Worker 3、LocalStorage 4、Shared Worker 5、IndexedDB 6、Cookie 7、Window 8、Websocket 总结&#xff1a; 1、BroadCast Channel 1、创建文件sender.html <!DOCTYPE html> <html lang"zh"><head&g…

idea如何拉取git仓库新的项目,有git仓库地址

在 IntelliJ IDEA 中拉取一个新的 Git 仓库项目&#xff0c;你可以按照以下步骤操作&#xff1a; 打开 IntelliJ IDEA。 如果你还没有打开任何项目&#xff08;即处于欢迎界面&#xff09;&#xff0c;请选择“Get from VCS”&#xff08;从版本控制系统获取&#xff09;。如果…