EF Core 事务实战:SaveChanges、ExecutionStrategy、Outbox 与一致性边界

“事务加上了,这可不就问了嘛”。真正到线上一跑,问题就冒出来了:

  • 数据库提交成功了,但消息没发出去
  • 外部调用成功了,但本地事务回滚了
  • SQL 重试机制打开后,一段代码被重复执行了两次
  • 同一个请求里明明都写在一起,最终状态还是不一致

这篇文章不讲抽象理论,直接回答一个工程问题:在 EF Core 里,事务边界到底该怎么画,哪些事情能靠数据库事务兜住,哪些事情必须换一种设计。

问题背景:为什么“写成功了”不等于业务真的成功

先看一个常见场景:下单成功后,需要同时完成三件事。

  1. 写入订单表
  2. 扣减库存
  3. 发出一条订单已创建消息,通知下游做履约

很多项目最开始都会写成这样:

await using var tx = await db.Database.BeginTransactionAsync(ct);

order.Status = OrderStatus.Created;
inventory.Reserve(quantity);

await db.SaveChangesAsync(ct);
await messageBus.PublishAsync(new OrderCreated(order.Id), ct);

await tx.CommitAsync(ct);

这段代码看起来很顺,实际有三个明显风险:

  • SaveChangesAsync 成功后,PublishAsync 失败,数据库已经提交,但下游不知道这笔订单存在
  • PublishAsync 成功后,CommitAsync 失败,下游收到消息,但数据库里没有最终结果
  • 如果启用了连接重试,整段逻辑可能被重放,导致消息重复发送

问题的根因不是 EF Core 不行,而是数据库事务的能力边界经常被高估了。

数据库事务能保证的是“同一个数据库连接里的状态一致”,它保证不了消息队列、HTTP 调用、Redis 写入这些外部副作用跟你一起原子提交。

原理解析:EF Core 里的事务到底管到哪一步

SaveChanges 默认就带事务,但范围很有限

对关系型数据库来说,EF Core 在一次 SaveChanges 中默认会开启事务,确保这一批 insert/update/delete 要么全部成功,要么全部失败。

也就是说,下面这种写法本身就已经带事务语义:

order.Status = OrderStatus.Created;
inventory.Reserve(quantity);

await db.SaveChangesAsync(ct);

这个默认事务适合的场景很明确:

  • 同一个 DbContext
  • 同一个数据库
  • 同一次提交
  • 没有外部副作用参与

只有当你要把多次 SaveChanges、多段仓储调用、或跨服务动作绑在一起,就要考虑显式事务。

显式事务解决的是“多步数据库写入”

如果业务上需要多次落库,而且这些写入必须成组成功,可以显式开启事务:

await using var tx = await db.Database.BeginTransactionAsync(ct);

order.Status = OrderStatus.Created;
await db.SaveChangesAsync(ct);

payment.Status = PaymentStatus.Pending;
await db.SaveChangesAsync(ct);

await tx.CommitAsync(ct);

这类事务能解决的问题是:多次数据库操作之间不能只成功一半。

但它依然解决不了:

  • 数据库之外的资源一致性
  • 网络抖动引发的重试幂等问题
  • 跨数据库、跨服务的最终一致性

ExecutionStrategy 失败重放机制

EF Core 在 SQL Server 等提供程序上支持执行重试。典型配置如下:

builder.Services.AddDbContext<AppDbContext>(options =>
    options.UseSqlServer(
        connectionString,
        sql => sql.EnableRetryOnFailure(
            maxRetryCount: 3,
            maxRetryDelay: TimeSpan.FromSeconds(10),
            errorNumbersToAdd: null)));

它的目标是处理瞬时故障,比如:

  • 短暂网络闪断
  • 数据库短时不可用
  • 连接建立失败
  • 死锁或部分可重试错误

关键点在于:它的实现方式通常不是“从失败点继续”,而是“把整个委托重新执行一遍”。

这意味着,如果你把外部副作用放进可重试委托里,就可能出现重复执行。

手动事务和重试策略必须一起设计

我遇到团队的实习生在这里踩了坑:启用了 ExecutionStrategy,然后手写 BeginTransactionAsync,结果运行时报错或者行为不符合预期。

原因很简单。对于可能被重试的一组操作,EF Core 需要知道“失败后应该重放哪一整段逻辑”。如果你在外面自己开事务、里面零散执行,它无法安全重放整个事务块。

正确写法应该是把整段事务逻辑包进执行策略委托:

var strategy = db.Database.CreateExecutionStrategy();

await strategy.ExecuteAsync(async () =>
{
    await using var tx = await db.Database.BeginTransactionAsync(ct);

    order.Status = OrderStatus.Created;
    inventory.Reserve(quantity);

    await db.SaveChangesAsync(ct);
    await tx.CommitAsync(ct);
});

注意啦!!!这种方式只适合“整段逻辑可安全重放”的场景。如果里面掺了发消息、调外部接口、生成不可重复消费副作用,就还是不可取。

一致性边界的核心判断

判断该不该上 Outbox,其实只要问自己两个问题:

  1. 这次业务提交里,是否包含数据库之外的副作用(发消息、调用外部接口等)?
  2. 这些副作用如果重复执行一次,系统能不能承受?

如果答案分别是“是”和“不能”,那就别把它们直接塞进事务提交路径里。

示例代码:本地事务 + Outbox 的最小落地模型

下面给一套最小但能落地的做法:业务数据和待发送消息一起写数据库,真正发消息放到事务外,由后台任务异步投递。

第一步:定义业务实体和 Outbox 实体

using System.Text.Json;
using Microsoft.EntityFrameworkCore;

public enum OrderStatus
{
    Pending = 0,
    Created = 1
}

public sealed class Order
{
    public long Id { get; set; }
    public string OrderNo { get; set; } = string.Empty;
    public long UserId { get; set; }
    public decimal TotalAmount { get; set; }
    public OrderStatus Status { get; set; }
    public DateTime CreatedAtUtc { get; set; }
}

public sealed class OutboxMessage
{
    public long Id { get; set; }
    public string Type { get; set; } = string.Empty;
    public string Payload { get; set; } = string.Empty;
    public DateTime OccurredAtUtc { get; set; }
    public DateTime? ProcessedAtUtc { get; set; }
    public int RetryCount { get; set; }
    public string? Error { get; set; }
}

public sealed record OrderCreatedEvent(long OrderId, string OrderNo, long UserId, decimal TotalAmount);

第二步:配置 DbContext

public sealed class AppDbContext : DbContext
{
    public DbSet<Order> Orders => Set<Order>();
    public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();

    public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
    {
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<Order>()
            .HasIndex(x => x.OrderNo)
            .IsUnique();

        modelBuilder.Entity<OutboxMessage>()
            .HasIndex(x => new { x.ProcessedAtUtc, x.OccurredAtUtc });

        modelBuilder.Entity<OutboxMessage>()
            .Property(x => x.Type)
            .HasMaxLength(200);
    }
}

第三步:把业务写入和 Outbox 写入放进同一个事务

public sealed class CreateOrderCommand
{
    public long UserId { get; init; }
    public decimal TotalAmount { get; init; }
}

public sealed class OrderAppService
{
    private readonly AppDbContext _db;

    public OrderAppService(AppDbContext db)
    {
        _db = db;
    }

    public async Task<long> CreateOrderAsync(CreateOrderCommand command, CancellationToken ct)
    {
        var strategy = _db.Database.CreateExecutionStrategy();

        return await strategy.ExecuteAsync(async () =>
        {
            await using var tx = await _db.Database.BeginTransactionAsync(ct);

            var order = new Order
            {
                OrderNo = $"ORD{DateTime.UtcNow:yyyyMMddHHmmssfff}",
                UserId = command.UserId,
                TotalAmount = command.TotalAmount,
                Status = OrderStatus.Created,
                CreatedAtUtc = DateTime.UtcNow
            };

            _db.Orders.Add(order);
            await _db.SaveChangesAsync(ct);

            var integrationEvent = new OrderCreatedEvent(
                order.Id,
                order.OrderNo,
                order.UserId,
                order.TotalAmount);

            _db.OutboxMessages.Add(new OutboxMessage
            {
                Type = nameof(OrderCreatedEvent),
                Payload = JsonSerializer.Serialize(integrationEvent),
                OccurredAtUtc = DateTime.UtcNow
            });

            await _db.SaveChangesAsync(ct);
            await tx.CommitAsync(ct);

            return order.Id;
        });
    }
}

这段写法的重点不是“消息马上发出去”,而是先保证:

  • 订单成功时,Outbox 记录一定存在
  • 订单失败时,Outbox 记录一定不会单独留下
  • 即使事务块被重试,数据库里仍然是可追踪、可恢复的状态

第四步:后台任务异步投递 Outbox

public interface IMessageBus
{
    Task PublishAsync(string type, string payload, CancellationToken ct);
}

public sealed class OutboxPublisher : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<OutboxPublisher> _logger;

    public OutboxPublisher(IServiceScopeFactory scopeFactory, ILogger<OutboxPublisher> logger)
    {
        _scopeFactory = scopeFactory;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await PublishPendingMessagesAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Publish outbox messages failed.");
            }

            await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
        }
    }

    private async Task PublishPendingMessagesAsync(CancellationToken ct)
    {
        await using var scope = _scopeFactory.CreateAsyncScope();
        var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
        var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();

        var messages = await db.OutboxMessages
            .Where(x => x.ProcessedAtUtc == null)
            .OrderBy(x => x.Id)
            .Take(100)
            .ToListAsync(ct);

        foreach (var message in messages)
        {
            try
            {
                await bus.PublishAsync(message.Type, message.Payload, ct);
                message.ProcessedAtUtc = DateTime.UtcNow;
                message.Error = null;
            }
            catch (Exception ex)
            {
                message.RetryCount += 1;
                message.Error = ex.Message;
            }
        }

        await db.SaveChangesAsync(ct);
    }
}

这时系统的一致性模型就很清楚了:

  • 本地数据库事务负责把“业务结果”和“待投递事件”绑定在一起
  • 后台投递负责把事件可靠送出去
  • 如果消息系统挂了,主交易路径不必跟着一起回滚

这不是“强一致”,但它是我所经历的大多数业务系统里更稳的工程解法。也许,你有更优的策略,非常期望你能发布在评论区,让在下膜拜学习。

总结

掌握 EF Core 事务,关键是要认清它的能力边界。

记住三个结论就够了:

  1. 单次 SaveChanges 的默认事务,适合解决单库单提交的一致性问题
  2. ExecutionStrategy 本质是失败重放机制,不是无脑开了就更靠谱
  3. 只要链路里有数据库之外的副作用,Outbox 往往比“事务里直接一起做”更靠谱

数据库事务先把本地数据这部分处理干净,Outbox 再把外部副作用延后、稳稳地送出去。把这两层职责拆开,系统才更接近真正可维护的生产方案。