本地事务消息(Outbox + Inbox Barrier)完整指南
在业务系统里,“写数据库 + 发 MQ”是最常见的组合之一,也是最容易在故障场景里出问题的组合之一。
如果把这两步看成一个整体目标,那么它至少要满足三件事:
- 业务数据和消息意图必须一致,不允许“库里有事实,MQ 没有通知”。
- 消息必须最终可达,不因为一次网络抖动就永久丢失。
- 消费端必须可幂等,不因为重复投递导致重复执行业务。
Maomi.MQ.Transaction 解决的正是这三件事。它并不是追求“跨数据库和 MQ 的强一致分布式事务”,而是通过 Outbox + Inbox Barrier 在工程上实现可验证、可恢复、可扩展的最终一致方案。
方案总览
默认会使用两张事务消息表:
mq_publisher:发布侧 Outbox 表,负责保存“待发送/重试/完成”的消息状态。mq_consumer:消费侧 Inbox Barrier 表,负责保存“该消费者是否已经处理过此消息”。
状态值约定:
0Pending1Processing2Succeeded3Failed
建表脚本可直接参考:
asserts/transaction-mysql-default-create-table.sqlasserts/transaction-postgres-default-create-table.sqlasserts/transaction-sqlserver-default-create-table.sql
接入方式(ADO.NET)
包依赖
dotnet add package Maomi.MQ.Transaction
dotnet add package Maomi.MQ.Transaction.Mysql
如使用 PostgreSQL/SQL Server,请替换为对应 provider 包。
服务注册
using Maomi.MQ.Transaction.Mysql;
using MySqlConnector;
builder.Services.AddMaomiMQTransactionMySql();
builder.Services.AddMaomiMQTransaction(options =>
{
options.ProviderName = "mysql";
options.Connection = _ => new MySqlConnection(connectionString);
options.AutoCreateTable = true;
});
builder.Services.AddMaomiMQ(
(MqOptionsBuilder options) =>
{
// RabbitMQ 配置
},
[typeof(Program).Assembly],
Maomi.MQ.Extensions.CreateTransactionFilters());
这里有一个实践建议:AutoCreateTable=true 适合开发联调,生产环境通常建议交给迁移工具或 DBA 脚本治理。
需要注入的服务(发布端 + 消费端)
在 ADO.NET 模式下,常见只需要注入下面三个服务:
ITransactionOutboxService:负责在数据库事务中登记 Outbox 记录,并在提交后触发发布。ITransactionBarrierService:负责消费侧屏障(防重、状态推进、失败可重试)。IMessagePublisher:仅在你需要直接发布(不走事务 outbox)时使用。
public sealed class OrderAppService
{
private readonly ITransactionOutboxService _outboxService;
private readonly ITransactionBarrierService _barrierService;
private readonly IMessagePublisher _publisher;
public OrderAppService(
ITransactionOutboxService outboxService,
ITransactionBarrierService barrierService,
IMessagePublisher publisher)
{
_outboxService = outboxService;
_barrierService = barrierService;
_publisher = publisher;
}
}
通常业务主链路只用 ITransactionOutboxService 和 ITransactionBarrierService,IMessagePublisher 更多用于非事务消息或运维补发场景。
发布端推荐写法(Outbox)
标准写法
await using var connection = new MySqlConnection(connectionString);
await connection.OpenAsync(ct);
await using var tx = await connection.BeginTransactionAsync(ct);
// 业务写入
await SaveOrderAsync(connection, tx, order, ct);
// 同事务注册 outbox
var outbox = await _outboxService.RegisterAsync(
connection,
tx,
exchange: "",
routingKey: "order.created",
message,
cancellationToken: ct);
// 提交业务事务
await tx.CommitAsync(ct);
// 提交后立即尝试发布(内部自动处理 mark success / fail)
await outbox.PublishAsync(ct);
这段代码的重点不是“短”,而是语义明确:
- 事务里只做数据库写入,不做网络调用。
- 事务提交后才触发 MQ 发送,避免持有数据库锁等待网络。
- 即使发送失败,消息也仍在 Outbox 表中,可被后台补发。
委托封装写法
var outbox = await ExecuteWithOutboxAsync(
exchange,
routingKey,
message,
async (conn, tx, ct) =>
{
await InsertOrderAsync(conn, tx, order, ct);
},
cancellationToken);
如果团队不希望每个业务接口都重复“开事务 + 注册消息 + 提交 + 发布”的模板代码,建议把这一流程封装成基础能力方法,再按业务传入委托。
发布端需要注入的服务
public sealed class OrderCommandService
{
private readonly ITransactionOutboxService _outboxService;
public OrderCommandService(ITransactionOutboxService outboxService)
{
_outboxService = outboxService;
}
}
完整业务示例(事务 + Outbox + 提交后发布)
public async Task<long> CreateOrderAsync(CreateOrderRequest request, CancellationToken ct)
{
await using var connection = new MySqlConnection(_connectionString);
await connection.OpenAsync(ct);
await using var tx = await connection.BeginTransactionAsync(ct);
var orderId = _idGen.NextId();
var message = new OrderCreatedMessage
{
OrderId = orderId,
UserId = request.UserId,
Amount = request.Amount
};
// 1) 写业务数据
await InsertOrderAsync(connection, tx, orderId, request, ct);
// 2) 同事务登记 outbox(这里会生成并持久化 MessageId)
var outbox = await _outboxService.RegisterAsync(
connection,
tx,
exchange: "",
routingKey: "order.created",
message,
cancellationToken: ct);
// 3) 提交本地事务
await tx.CommitAsync(ct);
// 4) 提交后立即发布(内部会处理成功/失败状态)
await outbox.PublishAsync(ct);
return orderId;
}
这个流程的关键点是:数据库提交成功后才执行 MQ 网络发送;若发送失败,Outbox 记录仍然存在,后台补偿任务可以继续重试发送。
消费端推荐写法(Inbox Barrier)
委托式屏障
await _barrierService.ExecuteInBarrierAsync(
consumerName: "order.created",
messageHeader,
async (connection, transaction, cancellationToken) =>
{
await DoBusinessAsync(connection, transaction, cancellationToken);
},
cancellationToken);
委托式屏障的价值是把“进入屏障、状态迁移、提交/回滚”统一收口,业务代码只关心真正的处理逻辑,降低遗漏状态更新的概率。
消费端需要注入的服务
public sealed class TransactionOrderCreatedConsumer : IConsumer<OrderCreatedMessage>
{
private readonly ITransactionBarrierService _barrierService;
public TransactionOrderCreatedConsumer(ITransactionBarrierService barrierService)
{
_barrierService = barrierService;
}
public Task ConsumeAsync(MessageHeader header, OrderCreatedMessage message, CancellationToken cancellationToken)
{
return _barrierService.ExecuteInBarrierAsync(
consumerName: "order.created.consumer",
header,
async (connection, transaction, ct) =>
{
// 业务逻辑:建议保持幂等,例如按 OrderId 更新状态/写流水
await HandleOrderCreatedAsync(connection, transaction, message, ct);
},
cancellationToken);
}
}
你可以把业务代码集中在委托里,框架负责屏障进入、状态变更、提交与失败回滚,减少手工调用 Enter/Mark/Commit 的样板代码。
EF Core 集成
Maomi.MQ.Transaction.EFCore 提供两类核心能力:
- 事务业务编排服务:
IEfCoreTransactionService - 统一消息上下文抽象:
ITransactionMessageDbContext
默认消息上下文
框架内置 MaomiMQTransactionDbContext,包含:
DbSet<OutboxMessageEntity> OutboxMessagesDbSet<InboxBarrierEntity> InboxBarriers
适合快速接入,不需要先改业务库结构。
自定义消息上下文
如果你的业务 DbContext 已经包含上述两个实体,实现 ITransactionMessageDbContext 即可直接接入。
这意味着框架不需要关心你的映射细节,只要求上下文满足约定接口,能减少多上下文切换成本。
统一注册入口
AddMaomiMQTransactionEFCore 已合并消息上下文注册能力,不需要再单独调用 AddMaomiMQTransactionMessageDbContext。
使用默认消息上下文:
builder.Services.AddMaomiMQTransactionEFCore(
(_, options) =>
{
options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString));
},
options =>
{
options.AutoSaveChanges = true;
});
使用自定义上下文:
builder.Services.AddDbContext<AppDbContext>(options =>
{
options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString));
});
// AppDbContext : DbContext, ITransactionMessageDbContext
builder.Services.AddMaomiMQTransactionEFCore<AppDbContext>(options =>
{
options.AutoSaveChanges = true;
});
EF Core 发布示例
var outbox = await _efCoreTransactionService.ExecuteAndRegisterAsync(
dbContext,
async (db, ct) =>
{
db.Set<Order>().Add(order);
await Task.CompletedTask;
},
exchange: "",
routingKey: "order.created",
message,
cancellationToken: ct);
await outbox.PublishAsync(ct);
EF Core 屏障消费示例
await _efCoreTransactionService.ExecuteInBarrierAsync(
dbContext,
consumerName: "order.created",
messageHeader,
async (db, ct) =>
{
await Task.CompletedTask;
},
ct);
EF Core 需要注入的服务
在 EF Core 模式下,建议优先注入:
IEfCoreTransactionService:统一处理“业务事务 + Outbox 注册 + Barrier 执行”。ITransactionMessageDbContext或你的AppDbContext:用于访问消息表与业务表。
public sealed class OrderEfCoreService
{
private readonly IEfCoreTransactionService _txService;
private readonly AppDbContext _db;
public OrderEfCoreService(IEfCoreTransactionService txService, AppDbContext db)
{
_txService = txService;
_db = db;
}
}
EF Core 发布者示例(委托发布)
public async Task<long> CreateOrderAsync(CreateOrderRequest request, CancellationToken ct)
{
var orderId = _idGen.NextId();
var message = new OrderCreatedMessage
{
OrderId = orderId,
UserId = request.UserId,
Amount = request.Amount
};
var outbox = await _txService.ExecuteAndRegisterAsync(
_db,
async (db, cancellationToken) =>
{
db.Set<Order>().Add(new Order
{
Id = orderId,
UserId = request.UserId,
Amount = request.Amount,
Status = OrderStatus.Created
});
await Task.CompletedTask;
},
exchange: "",
routingKey: "order.created",
message,
cancellationToken: ct);
await outbox.PublishAsync(ct);
return orderId;
}
EF Core 消费者 + 屏障示例
public sealed class TransactionOrderCreatedEfConsumer : IConsumer<OrderCreatedMessage>
{
private readonly IEfCoreTransactionService _txService;
private readonly AppDbContext _db;
public TransactionOrderCreatedEfConsumer(IEfCoreTransactionService txService, AppDbContext db)
{
_txService = txService;
_db = db;
}
public Task ConsumeAsync(MessageHeader header, OrderCreatedMessage message, CancellationToken cancellationToken)
{
return _txService.ExecuteInBarrierAsync(
_db,
consumerName: "order.created.consumer",
header,
async (db, ct) =>
{
var entity = await db.Set<Order>().FindAsync([message.OrderId], ct);
if (entity is null)
return;
entity.Status = OrderStatus.Processed;
await db.SaveChangesAsync(ct);
},
cancellationToken);
}
}
当你的 AppDbContext 已实现 ITransactionMessageDbContext 且包含两个消息表实体时,框架会直接复用当前上下文,不需要额外维护独立消息库。
清理策略(避免消息表无限增长)
事务表在高吞吐场景会持续增长,因此建议开启自动清理。现在可在 AddMaomiMQTransaction 里配置 Cleanup,支持两个触发条件,满足任意条件即可执行清理:
- 超过保留天数(
KeepCompletedDays) - 已完成记录数超过阈值(
MaxCompletedCount)
示例:
builder.Services.AddMaomiMQTransaction(options =>
{
options.ProviderName = "mysql";
options.Connection = _ => new MySqlConnection(connectionString);
options.Cleanup = new MQTransactionCleanupOptions
{
Enabled = true,
ScanInterval = TimeSpan.FromMinutes(2),
KeepCompletedDays = 7,
MaxCompletedCount = 100000,
DeleteBatchSize = 1000
};
});
关于“超过数量阈值”的行为,需要特别说明:
- 不是只删除“超出的那几条”。
- 一旦超过阈值,会直接删除当前已完成记录的一半(分批执行)。
这样可以避免阈值附近的抖动清理问题,例如 100001 -> 删 1 -> 下轮又 100001 的无效循环。