本地事务消息(Outbox + Inbox Barrier)完整指南

在业务系统里,“写数据库 + 发 MQ”是最常见的组合之一,也是最容易在故障场景里出问题的组合之一。

如果把这两步看成一个整体目标,那么它至少要满足三件事:

  • 业务数据和消息意图必须一致,不允许“库里有事实,MQ 没有通知”。
  • 消息必须最终可达,不因为一次网络抖动就永久丢失。
  • 消费端必须可幂等,不因为重复投递导致重复执行业务。

Maomi.MQ.Transaction 解决的正是这三件事。它并不是追求“跨数据库和 MQ 的强一致分布式事务”,而是通过 Outbox + Inbox Barrier 在工程上实现可验证、可恢复、可扩展的最终一致方案。

方案总览

默认会使用两张事务消息表:

  • mq_publisher:发布侧 Outbox 表,负责保存“待发送/重试/完成”的消息状态。
  • mq_consumer:消费侧 Inbox Barrier 表,负责保存“该消费者是否已经处理过此消息”。

状态值约定:

  • 0 Pending
  • 1 Processing
  • 2 Succeeded
  • 3 Failed

建表脚本可直接参考:

  • asserts/transaction-mysql-default-create-table.sql
  • asserts/transaction-postgres-default-create-table.sql
  • asserts/transaction-sqlserver-default-create-table.sql

接入方式(ADO.NET)

包依赖

dotnet add package Maomi.MQ.Transaction
dotnet add package Maomi.MQ.Transaction.Mysql

如使用 PostgreSQL/SQL Server,请替换为对应 provider 包。

服务注册

using Maomi.MQ.Transaction.Mysql;
using MySqlConnector;

builder.Services.AddMaomiMQTransactionMySql();

builder.Services.AddMaomiMQTransaction(options =>
{
    options.ProviderName = "mysql";
    options.Connection = _ => new MySqlConnection(connectionString);
    options.AutoCreateTable = true;
});

builder.Services.AddMaomiMQ(
    (MqOptionsBuilder options) =>
    {
        // RabbitMQ 配置
    },
    [typeof(Program).Assembly],
    Maomi.MQ.Extensions.CreateTransactionFilters());

这里有一个实践建议:AutoCreateTable=true 适合开发联调,生产环境通常建议交给迁移工具或 DBA 脚本治理。

需要注入的服务(发布端 + 消费端)

在 ADO.NET 模式下,常见只需要注入下面三个服务:

  • ITransactionOutboxService:负责在数据库事务中登记 Outbox 记录,并在提交后触发发布。
  • ITransactionBarrierService:负责消费侧屏障(防重、状态推进、失败可重试)。
  • IMessagePublisher:仅在你需要直接发布(不走事务 outbox)时使用。
public sealed class OrderAppService
{
    private readonly ITransactionOutboxService _outboxService;
    private readonly ITransactionBarrierService _barrierService;
    private readonly IMessagePublisher _publisher;

    public OrderAppService(
        ITransactionOutboxService outboxService,
        ITransactionBarrierService barrierService,
        IMessagePublisher publisher)
    {
        _outboxService = outboxService;
        _barrierService = barrierService;
        _publisher = publisher;
    }
}

通常业务主链路只用 ITransactionOutboxServiceITransactionBarrierServiceIMessagePublisher 更多用于非事务消息或运维补发场景。

发布端推荐写法(Outbox)

标准写法

await using var connection = new MySqlConnection(connectionString);
await connection.OpenAsync(ct);
await using var tx = await connection.BeginTransactionAsync(ct);

// 业务写入
await SaveOrderAsync(connection, tx, order, ct);

// 同事务注册 outbox
var outbox = await _outboxService.RegisterAsync(
    connection,
    tx,
    exchange: "",
    routingKey: "order.created",
    message,
    cancellationToken: ct);

// 提交业务事务
await tx.CommitAsync(ct);

// 提交后立即尝试发布(内部自动处理 mark success / fail)
await outbox.PublishAsync(ct);

这段代码的重点不是“短”,而是语义明确:

  • 事务里只做数据库写入,不做网络调用。
  • 事务提交后才触发 MQ 发送,避免持有数据库锁等待网络。
  • 即使发送失败,消息也仍在 Outbox 表中,可被后台补发。

委托封装写法

var outbox = await ExecuteWithOutboxAsync(
    exchange,
    routingKey,
    message,
    async (conn, tx, ct) =>
    {
        await InsertOrderAsync(conn, tx, order, ct);
    },
    cancellationToken);

如果团队不希望每个业务接口都重复“开事务 + 注册消息 + 提交 + 发布”的模板代码,建议把这一流程封装成基础能力方法,再按业务传入委托。

发布端需要注入的服务

public sealed class OrderCommandService
{
    private readonly ITransactionOutboxService _outboxService;

    public OrderCommandService(ITransactionOutboxService outboxService)
    {
        _outboxService = outboxService;
    }
}

完整业务示例(事务 + Outbox + 提交后发布)

public async Task<long> CreateOrderAsync(CreateOrderRequest request, CancellationToken ct)
{
    await using var connection = new MySqlConnection(_connectionString);
    await connection.OpenAsync(ct);
    await using var tx = await connection.BeginTransactionAsync(ct);

    var orderId = _idGen.NextId();
    var message = new OrderCreatedMessage
    {
        OrderId = orderId,
        UserId = request.UserId,
        Amount = request.Amount
    };

    // 1) 写业务数据
    await InsertOrderAsync(connection, tx, orderId, request, ct);

    // 2) 同事务登记 outbox(这里会生成并持久化 MessageId)
    var outbox = await _outboxService.RegisterAsync(
        connection,
        tx,
        exchange: "",
        routingKey: "order.created",
        message,
        cancellationToken: ct);

    // 3) 提交本地事务
    await tx.CommitAsync(ct);

    // 4) 提交后立即发布(内部会处理成功/失败状态)
    await outbox.PublishAsync(ct);

    return orderId;
}

这个流程的关键点是:数据库提交成功后才执行 MQ 网络发送;若发送失败,Outbox 记录仍然存在,后台补偿任务可以继续重试发送。

消费端推荐写法(Inbox Barrier)

委托式屏障

await _barrierService.ExecuteInBarrierAsync(
    consumerName: "order.created",
    messageHeader,
    async (connection, transaction, cancellationToken) =>
    {
        await DoBusinessAsync(connection, transaction, cancellationToken);
    },
    cancellationToken);

委托式屏障的价值是把“进入屏障、状态迁移、提交/回滚”统一收口,业务代码只关心真正的处理逻辑,降低遗漏状态更新的概率。

消费端需要注入的服务

public sealed class TransactionOrderCreatedConsumer : IConsumer<OrderCreatedMessage>
{
    private readonly ITransactionBarrierService _barrierService;

    public TransactionOrderCreatedConsumer(ITransactionBarrierService barrierService)
    {
        _barrierService = barrierService;
    }

    public Task ConsumeAsync(MessageHeader header, OrderCreatedMessage message, CancellationToken cancellationToken)
    {
        return _barrierService.ExecuteInBarrierAsync(
            consumerName: "order.created.consumer",
            header,
            async (connection, transaction, ct) =>
            {
                // 业务逻辑:建议保持幂等,例如按 OrderId 更新状态/写流水
                await HandleOrderCreatedAsync(connection, transaction, message, ct);
            },
            cancellationToken);
    }
}

你可以把业务代码集中在委托里,框架负责屏障进入、状态变更、提交与失败回滚,减少手工调用 Enter/Mark/Commit 的样板代码。

EF Core 集成

Maomi.MQ.Transaction.EFCore 提供两类核心能力:

  • 事务业务编排服务:IEfCoreTransactionService
  • 统一消息上下文抽象:ITransactionMessageDbContext

默认消息上下文

框架内置 MaomiMQTransactionDbContext,包含:

  • DbSet<OutboxMessageEntity> OutboxMessages
  • DbSet<InboxBarrierEntity> InboxBarriers

适合快速接入,不需要先改业务库结构。

自定义消息上下文

如果你的业务 DbContext 已经包含上述两个实体,实现 ITransactionMessageDbContext 即可直接接入。

这意味着框架不需要关心你的映射细节,只要求上下文满足约定接口,能减少多上下文切换成本。

统一注册入口

AddMaomiMQTransactionEFCore 已合并消息上下文注册能力,不需要再单独调用 AddMaomiMQTransactionMessageDbContext

使用默认消息上下文:

builder.Services.AddMaomiMQTransactionEFCore(
    (_, options) =>
    {
        options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString));
    },
    options =>
    {
        options.AutoSaveChanges = true;
    });

使用自定义上下文:

builder.Services.AddDbContext<AppDbContext>(options =>
{
    options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString));
});

// AppDbContext : DbContext, ITransactionMessageDbContext
builder.Services.AddMaomiMQTransactionEFCore<AppDbContext>(options =>
{
    options.AutoSaveChanges = true;
});

EF Core 发布示例

var outbox = await _efCoreTransactionService.ExecuteAndRegisterAsync(
    dbContext,
    async (db, ct) =>
    {
        db.Set<Order>().Add(order);
        await Task.CompletedTask;
    },
    exchange: "",
    routingKey: "order.created",
    message,
    cancellationToken: ct);

await outbox.PublishAsync(ct);

EF Core 屏障消费示例

await _efCoreTransactionService.ExecuteInBarrierAsync(
    dbContext,
    consumerName: "order.created",
    messageHeader,
    async (db, ct) =>
    {
        await Task.CompletedTask;
    },
    ct);

EF Core 需要注入的服务

在 EF Core 模式下,建议优先注入:

  • IEfCoreTransactionService:统一处理“业务事务 + Outbox 注册 + Barrier 执行”。
  • ITransactionMessageDbContext 或你的 AppDbContext:用于访问消息表与业务表。
public sealed class OrderEfCoreService
{
    private readonly IEfCoreTransactionService _txService;
    private readonly AppDbContext _db;

    public OrderEfCoreService(IEfCoreTransactionService txService, AppDbContext db)
    {
        _txService = txService;
        _db = db;
    }
}

EF Core 发布者示例(委托发布)

public async Task<long> CreateOrderAsync(CreateOrderRequest request, CancellationToken ct)
{
    var orderId = _idGen.NextId();
    var message = new OrderCreatedMessage
    {
        OrderId = orderId,
        UserId = request.UserId,
        Amount = request.Amount
    };

    var outbox = await _txService.ExecuteAndRegisterAsync(
        _db,
        async (db, cancellationToken) =>
        {
            db.Set<Order>().Add(new Order
            {
                Id = orderId,
                UserId = request.UserId,
                Amount = request.Amount,
                Status = OrderStatus.Created
            });
            await Task.CompletedTask;
        },
        exchange: "",
        routingKey: "order.created",
        message,
        cancellationToken: ct);

    await outbox.PublishAsync(ct);
    return orderId;
}

EF Core 消费者 + 屏障示例

public sealed class TransactionOrderCreatedEfConsumer : IConsumer<OrderCreatedMessage>
{
    private readonly IEfCoreTransactionService _txService;
    private readonly AppDbContext _db;

    public TransactionOrderCreatedEfConsumer(IEfCoreTransactionService txService, AppDbContext db)
    {
        _txService = txService;
        _db = db;
    }

    public Task ConsumeAsync(MessageHeader header, OrderCreatedMessage message, CancellationToken cancellationToken)
    {
        return _txService.ExecuteInBarrierAsync(
            _db,
            consumerName: "order.created.consumer",
            header,
            async (db, ct) =>
            {
                var entity = await db.Set<Order>().FindAsync([message.OrderId], ct);
                if (entity is null)
                    return;

                entity.Status = OrderStatus.Processed;
                await db.SaveChangesAsync(ct);
            },
            cancellationToken);
    }
}

当你的 AppDbContext 已实现 ITransactionMessageDbContext 且包含两个消息表实体时,框架会直接复用当前上下文,不需要额外维护独立消息库。

清理策略(避免消息表无限增长)

事务表在高吞吐场景会持续增长,因此建议开启自动清理。现在可在 AddMaomiMQTransaction 里配置 Cleanup,支持两个触发条件,满足任意条件即可执行清理:

  • 超过保留天数(KeepCompletedDays
  • 已完成记录数超过阈值(MaxCompletedCount

示例:

builder.Services.AddMaomiMQTransaction(options =>
{
    options.ProviderName = "mysql";
    options.Connection = _ => new MySqlConnection(connectionString);

    options.Cleanup = new MQTransactionCleanupOptions
    {
        Enabled = true,
        ScanInterval = TimeSpan.FromMinutes(2),
        KeepCompletedDays = 7,
        MaxCompletedCount = 100000,
        DeleteBatchSize = 1000
    };
});

关于“超过数量阈值”的行为,需要特别说明:

  • 不是只删除“超出的那几条”。
  • 一旦超过阈值,会直接删除当前已完成记录的一半(分批执行)。

这样可以避免阈值附近的抖动清理问题,例如 100001 -> 删 1 -> 下轮又 100001 的无效循环。