Skip to content

Seata 实战示例

要理解Seata四种模式的代码落地,我们结合电商“下单→扣库存→支付”的经典场景,分别给出可运行的核心代码+配置,并解释关键逻辑。

准备工作

  1. 部署Seata Server:参考Seata官方文档部署TC集群(建议用Nacos做注册中心)。
  2. 数据库初始化
    • AT模式需创建undo_log表(所有参与AT模式的数据库都要);
    • 业务表:t_order(订单)、t_stock(库存)、t_account(账户)。

一、AT模式(无侵入,最常用)

AT模式是无侵入的分布式事务解决方案,核心是@GlobalTransactional注解+数据源代理。

1. 依赖配置(所有服务都需引入)

xml
<!-- Spring Cloud Alibaba Seata -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2023.0.1.0</version> <!-- 适配Spring Boot 3.x,若用2.x则选2.2.9.RELEASE -->
</dependency>
<!-- 数据库驱动(以MySQL为例) -->
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- 数据源(以Druid为例) -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.2.20</version>
</dependency>

2. 核心配置(application.yml)

yaml
spring:
  application:
    name: order-service  # 服务名
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    url: jdbc:mysql://localhost:3306/seata_order?useSSL=false&serverTimezone=UTC
    username: root
    password: root
  cloud:
    alibaba:
      seata:
        tx-service-group: order_tx_group  # 事务组(需与TC配置一致)

seata:
  registry:
    type: nacos  # 注册中心类型
    nacos:
      server-addr: 127.0.0.1:8848  # Nacos地址
      namespace: seata_namespace  # 命名空间(需提前创建)
  config:
    type: nacos  # 配置中心类型
    nacos:
      server-addr: 127.0.0.1:8848

3. 数据源代理(关键!AT模式必须)

Seata通过DataSourceProxy拦截SQL,生成undo_log。需替换原数据源:

java
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

@Configuration
public class SeataDataSourceConfig {

    // 1. 配置原始数据源(Druid)
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource rawDataSource() {
        return new DruidDataSource();
    }

    // 2. 用Seata代理数据源
    @Bean
    public DataSourceProxy dataSourceProxy(DataSource rawDataSource) {
        return new DataSourceProxy(rawDataSource);
    }

    // 3. 配置MyBatis的SqlSessionFactory(使用代理数据源)
    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
        factory.setDataSource(dataSourceProxy);
        factory.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath:mapper/*.xml")); // MyBatis映射文件路径
        return factory.getObject();
    }
}

4. 业务代码

(1)全局事务发起方:订单服务

@GlobalTransactional标记全局事务边界,调用库存服务和支付服务:

java
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;  // 订单DAO
    @Autowired
    private StockFeignClient stockFeignClient;  // 库存服务Feign客户端
    @Autowired
    private AccountFeignClient accountFeignClient;  // 账户服务Feign客户端

    /**
     * 创建订单(全局事务入口)
     * @param order 订单信息(包含商品ID、数量、用户ID、金额)
     */
    @GlobalTransactional(name = "createOrderTx", rollbackFor = Exception.class)
    public void createOrder(Order order) {
        // 1. 插入订单(本地事务,AT模式自动生成undo_log)
        orderMapper.insert(order);
        System.out.println("订单创建成功,orderId: " + order.getId());

        // 2. 调用库存服务扣减库存(分布式调用,XID自动传递)
        stockFeignClient.reduceStock(order.getProductId(), order.getNum());
        System.out.println("库存扣减成功,productId: " + order.getProductId());

        // 3. 调用账户服务扣减余额(分布式调用)
        accountFeignClient.reduceBalance(order.getUserId(), order.getAmount());
        System.out.println("余额扣减成功,userId: " + order.getUserId());
    }
}

(2)分支事务:库存服务

无需额外注解,Seata自动拦截SQL并注册分支事务:

java
@Service
public class StockService {

    @Autowired
    private StockMapper stockMapper;

    /**
     * 扣减库存(分支事务)
     * @param productId 商品ID
     * @param num 扣减数量
     */
    public void reduceStock(Long productId, Integer num) {
        // 直接执行SQL,Seata自动生成undo_log
        stockMapper.reduceStock(productId, num);
    }
}

对应的MyBatis映射文件(StockMapper.xml):

xml
<update id="reduceStock">
    UPDATE t_stock SET num = num - #{num} WHERE product_id = #{productId} AND num >= #{num}
</update>

5. 运行流程

  1. 订单服务调用createOrder@GlobalTransactional触发TM向TC申请XID
  2. Feign调用库存服务时,XID通过ThreadLocal传递到库存服务;
  3. 库存服务执行reduceStock,Seata拦截SQL,生成undo_log,提交本地事务,并向TC注册分支事务
  4. 若所有分支执行成功,TM向TC发起全局提交,TC通知所有RM删除undo_log
  5. 若任一分支失败(如库存不足抛出异常),TM向TC发起全局回滚,TC通知所有RM执行undo_log恢复数据。

二、TCC模式(手动补偿,灵活)

TCC模式需要手动实现Try/Confirm/Cancel三个方法,适用于非关系型数据库或第三方服务(如支付接口)。

1. 依赖配置

同AT模式,但无需数据源代理(TCC不依赖undo_log)。

2. 核心代码

库存服务为例,实现“冻结库存→确认扣减→取消冻结”的TCC逻辑:

(1)TCC接口定义

需用@TccAction注解标记Try方法,并指定Confirm和Cancel方法:

java
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TccAction;

// LocalTCC注解:标记这是一个TCC接口(Seata 1.5+推荐)
@LocalTCC
public interface StockTccService {

    /**
     * Try阶段:冻结库存(预留资源)
     * @param productId 商品ID
     * @param num 冻结数量
     * @param requestId 幂等唯一标识(如订单ID)
     * @return 是否成功
     */
    @TccAction(name = "freezeStock", commitMethod = "confirmFreeze", rollbackMethod = "cancelFreeze")
    boolean freezeStock(
            BusinessActionContext context,  // Seata自动传递的上下文(包含XID、BranchID)
            @BusinessActionContextParameter(paramName = "productId") Long productId,  // 传递到Confirm/Cancel的参数
            @BusinessActionContextParameter(paramName = "num") Integer num,
            @BusinessActionContextParameter(paramName = "requestId") String requestId
    );

    /**
     * Confirm阶段:确认扣减库存(Try成功后执行)
     * @param context 上下文
     * @return 是否成功
     */
    boolean confirmFreeze(BusinessActionContext context);

    /**
     * Cancel阶段:取消冻结库存(Try失败后执行)
     * @param context 上下文
     * @return 是否成功
     */
    boolean cancelFreeze(BusinessActionContext context);
}

(2)TCC实现类

需处理幂等性(避免重复执行)和空回滚(Try未执行时Cancel不处理):

java
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class StockTccServiceImpl implements StockTccService {

    @Autowired
    private StockMapper stockMapper;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final String LOCK_KEY_PREFIX = "tcc:freeze:";  // Redis幂等键前缀

    @Override
    public boolean freezeStock(BusinessActionContext context, Long productId, Integer num, String requestId) {
        // 1. 幂等检查:避免重复冻结(如Try重试)
        String lockKey = LOCK_KEY_PREFIX + requestId;
        if (redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.MINUTES)) {
            // 2. 冻结库存(将可用库存转为冻结库存)
            int rows = stockMapper.freezeStock(productId, num);
            if (rows > 0) {
                return true;
            } else {
                // 冻结失败,删除幂等键
                redisTemplate.delete(lockKey);
                return false;
            }
        }
        // 已处理过,直接返回成功
        return true;
    }

    @Override
    public boolean confirmFreeze(BusinessActionContext context) {
        // 1. 从上下文获取参数
        Long productId = Long.parseLong(context.getActionContext("productId").toString());
        Integer num = Integer.parseInt(context.getActionContext("num").toString());
        String requestId = context.getActionContext("requestId").toString();

        // 2. 确认扣减:将冻结库存转为实际扣减(可用库存=可用-冻结,冻结=0)
        stockMapper.confirmFreeze(productId, num);

        // 3. 删除幂等键
        redisTemplate.delete(LOCK_KEY_PREFIX + requestId);
        return true;
    }

    @Override
    public boolean cancelFreeze(BusinessActionContext context) {
        // 1. 从上下文获取参数
        Long productId = Long.parseLong(context.getActionContext("productId").toString());
        Integer num = Integer.parseInt(context.getActionContext("num").toString());
        String requestId = context.getActionContext("requestId").toString();

        // 2. 检查Try是否执行(幂等键是否存在)
        if (redisTemplate.hasKey(LOCK_KEY_PREFIX + requestId)) {
            // 3. 取消冻结:将冻结库存转回可用库存
            stockMapper.cancelFreeze(productId, num);
            // 4. 删除幂等键
            redisTemplate.delete(LOCK_KEY_PREFIX + requestId);
        }
        // Try未执行,直接返回成功(避免空回滚)
        return true;
    }
}

(3)全局事务发起方(订单服务)

调用TCC的Try方法,@GlobalTransactional会自动触发Confirm/Cancel:

java
@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private StockTccService stockTccService;  // 注入TCC服务
    @Autowired
    private AccountTccService accountTccService;

    @GlobalTransactional(name = "createOrderTccTx", rollbackFor = Exception.class)
    public void createOrder(Order order) {
        // 1. 插入订单(本地事务)
        orderMapper.insert(order);

        // 2. 调用库存TCC的Try方法(冻结库存)
        boolean stockResult = stockTccService.freezeStock(
                null,  // Seata自动填充上下文
                order.getProductId(),
                order.getNum(),
                order.getId().toString()  // requestId用订单ID(幂等)
        );
        if (!stockResult) {
            throw new RuntimeException("库存冻结失败");
        }

        // 3. 调用账户TCC的Try方法(冻结余额)
        boolean accountResult = accountTccService.freezeBalance(
                null,
                order.getUserId(),
                order.getAmount(),
                order.getId().toString()
        );
        if (!accountResult) {
            throw new RuntimeException("余额冻结失败");
        }
    }
}

3. 运行流程

  1. 订单服务调用freezeStock(Try阶段),冻结库存并记录幂等键;
  2. 若所有Try成功,TM向TC发起全局提交,TC通知所有RM执行confirmFreeze(确认扣减);
  3. 若任一Try失败(如库存不足),TM向TC发起全局回滚,TC通知所有RM执行cancelFreeze(取消冻结);
  4. 幂等键确保Try/Confirm/Cancel不会重复执行。

三、SAGA模式(长事务,最终一致)

SAGA模式适用于长链路、异步的事务(如电商“下单→扣库存→支付→发货→通知”),核心是正向流程+补偿流程

Seata的SAGA模式支持两种实现方式:

  • 注解式(简单场景);
  • 状态机式(复杂场景,推荐)。

1. 依赖配置

需额外引入SAGA依赖:

xml
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-saga-processors</artifactId>
    <version>1.8.0</version>
</dependency>

2. 状态机式实现(推荐)

SAGA状态机用JSON配置定义正向流程和补偿流程,Seata根据状态机自动执行。

(1)状态机配置文件(saga_create_order.json)

放在src/main/resources/saga目录下,定义“下单→扣库存→支付”的流程:

json
{
  "name": "createOrderSaga",
  "comment": "创建订单的SAGA流程",
  "startState": "createOrder",
  "states": {
    // 1. 正向步骤1:创建订单
    "createOrder": {
      "type": "ServiceTask",
      "serviceName": "orderService",
      "serviceMethod": "createOrder",
      "compensateState": "cancelOrder",  // 补偿步骤:取消订单
      "nextState": "reduceStock"
    },
    // 2. 正向步骤2:扣减库存
    "reduceStock": {
      "type": "ServiceTask",
      "serviceName": "stockService",
      "serviceMethod": "reduceStock",
      "compensateState": "restoreStock",  // 补偿步骤:恢复库存
      "nextState": "reduceBalance"
    },
    // 3. 正向步骤3:扣减余额
    "reduceBalance": {
      "type": "ServiceTask",
      "serviceName": "accountService",
      "serviceMethod": "reduceBalance",
      "compensateState": "restoreBalance",  // 补偿步骤:恢复余额
      "endState": true
    },
    // 补偿步骤1:取消订单
    "cancelOrder": {
      "type": "ServiceTask",
      "serviceName": "orderService",
      "serviceMethod": "cancelOrder"
    },
    // 补偿步骤2:恢复库存
    "restoreStock": {
      "type": "ServiceTask",
      "serviceName": "stockService",
      "serviceMethod": "restoreStock"
    },
    // 补偿步骤3:恢复余额
    "restoreBalance": {
      "type": "ServiceTask",
      "serviceName": "accountService",
      "serviceMethod": "restoreBalance"
    }
  }
}

(2)业务服务实现

每个正向步骤和补偿步骤对应一个业务方法:

java
// 订单服务
@Service("orderService")
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;

    // 正向:创建订单
    public void createOrder(Map<String, Object> param) {
        Long productId = Long.parseLong(param.get("productId").toString());
        Integer num = Integer.parseInt(param.get("num").toString());
        Long userId = Long.parseLong(param.get("userId").toString());
        BigDecimal amount = new BigDecimal(param.get("amount").toString());

        Order order = new Order();
        order.setProductId(productId);
        order.setNum(num);
        order.setUserId(userId);
        order.setAmount(amount);
        order.setStatus(1); // 1: 未支付
        orderMapper.insert(order);

        // 将订单ID存入上下文,供补偿流程使用
        param.put("orderId", order.getId());
    }

    // 补偿:取消订单
    public void cancelOrder(Map<String, Object> param) {
        Long orderId = Long.parseLong(param.get("orderId").toString());
        orderMapper.updateStatus(orderId, 0); // 0: 已取消
    }
}

// 库存服务
@Service("stockService")
public class StockService {
    @Autowired
    private StockMapper stockMapper;

    // 正向:扣减库存
    public void reduceStock(Map<String, Object> param) {
        Long productId = Long.parseLong(param.get("productId").toString());
        Integer num = Integer.parseInt(param.get("num").toString());
        stockMapper.reduceStock(productId, num);
    }

    // 补偿:恢复库存
    public void restoreStock(Map<String, Object> param) {
        Long productId = Long.parseLong(param.get("productId").toString());
        Integer num = Integer.parseInt(param.get("num").toString());
        stockMapper.restoreStock(productId, num);
    }
}

(3)触发SAGA流程

通过SagaEngine触发状态机:

java
import io.seata.saga.engine.SagaEngine;
import io.seata.saga.engine.context.SagaContext;
import io.seata.saga.engine.context.impl.JsonSagaContext;
import io.seata.saga.processor.DefaultSagaProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class SagaTriggerService {

    @Autowired
    private SagaEngine sagaEngine;
    @Autowired
    private DefaultSagaProcessor sagaProcessor;

    public void triggerCreateOrderSaga(Order order) {
        // 1. 构造流程参数
        Map<String, Object> param = new HashMap<>();
        param.put("productId", order.getProductId());
        param.put("num", order.getNum());
        param.put("userId", order.getUserId());
        param.put("amount", order.getAmount());

        // 2. 初始化SAGA上下文
        SagaContext context = new JsonSagaContext();
        context.setBizId(order.getId().toString()); // 业务唯一标识(如订单ID)

        // 3. 触发SAGA流程(状态机文件路径:saga/createOrderSaga.json)
        sagaEngine.start("saga/createOrderSaga.json", param, context);
    }
}

3. 运行流程

  1. 调用triggerCreateOrderSaga,SagaEngine加载状态机配置;
  2. 按顺序执行正向流程createOrderreduceStockreduceBalance
  3. 若所有正向步骤成功,流程结束;
  4. 若某一步失败(如reduceBalance时余额不足),SagaEngine反向执行补偿流程restoreBalancerestoreStockcancelOrder
  5. 补偿流程确保所有资源恢复到事务前状态,最终一致。

四、XA模式(强一致,依赖数据库)

XA模式是传统2PC的实现,依赖数据库的XA协议(如MySQL的XA START/XA END),适用于强一致场景(如金融核心系统)。

1. 依赖配置

需引入XA数据源依赖(以Druid为例):

xml
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.2.20</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.2.20</version>
</dependency>

2. XA数据源配置

需用DruidXADataSource替换原数据源(XA模式必须):

java
import com.alibaba.druid.pool.xa.DruidXADataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import javax.sql.XADataSource;

@Configuration
public class XADataSourceConfig {

    // 1. 配置XA数据源(DruidXADataSource)
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public XADataSource xaDataSource() {
        return new DruidXADataSource();
    }

    // 2. 用Seata代理XA数据源
    @Bean
    public DataSourceProxy dataSourceProxy(XADataSource xaDataSource) {
        return new DataSourceProxy(xaDataSource);
    }

    // 3. 配置MyBatis的SqlSessionFactory
    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
        factory.setDataSource(dataSourceProxy);
        factory.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath:mapper/*.xml"));
        return factory.getObject();
    }
}

3. 业务代码

与AT模式类似,用@GlobalTransactional标记全局事务,Seata自动处理XA两阶段提交:

java
@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private StockFeignClient stockFeignClient;
    @Autowired
    private AccountFeignClient accountFeignClient;

    @GlobalTransactional(name = "createOrderXaTx", rollbackFor = Exception.class)
    public void createOrder(Order order) {
        // 1. 插入订单(XA本地事务,未提交)
        orderMapper.insert(order);

        // 2. 调用库存服务扣减库存(XA分支事务,未提交)
        stockFeignClient.reduceStock(order.getProductId(), order.getNum());

        // 3. 调用账户服务扣减余额(XA分支事务,未提交)
        accountFeignClient.reduceBalance(order.getUserId(), order.getAmount());
    }
}

4. 运行流程

  1. 订单服务调用createOrder,TM向TC申请XID;
  2. 每个分支服务执行SQL时,Seata通过XA数据源发起XA START,执行SQL后XA END(但未提交);
  3. 所有分支执行完成后,TM向TC发起全局提交,TC通知所有RM执行XA COMMIT
  4. 若任一分支失败,TM向TC发起全局回滚,TC通知所有RM执行XA ROLLBACK
  5. XA模式的核心是数据库层面的两阶段提交,Seata仅负责协调。

总结:四种模式的代码对比

模式核心代码特征适用场景
AT@GlobalTransactional+数据源代理关系型数据库、低侵入
TCC手动实现Try/Confirm/Cancel+@LocalTCC非关系型数据库、第三方服务
SAGA状态机配置+正向/补偿方法长链路、异步、最终一致
XAXA数据源配置+@GlobalTransactional强一致、金融核心系统

注意:所有模式都需确保XID传递(Seata Starter自动处理Feign/RestTemplate),生产环境需配置高可用TC集群(Nacos注册中心)。