消息发布者
Maomi.MQ 通过 IMessagePublisher 向开发者提供消息推送服务。
消息发布者用于推送消息到 RabbitMQ 服务器中,Maomi.MQ 支持多种消息发布者模式,支持 RabbitMQ 事务模式等、数据库事务等。
在发布消息之前,需要定义一个事件模型类,用于传递消息。
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
然后注入 IMessagePublisher 服务,发布消息:
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = i
});
}
return "ok";
}
模型类也可以写提前设置 [RouterKey] 特性,这样在发送消息时不需要指定路由。
[RouterKey("scenario.quickstart")]
public sealed class QuickStartMessage
var message = new QuickStartMessage
{
Text = request.Text,
At = DateTimeOffset.UtcNow
};
await _publisher.AutoPublishAsync(message);
当然,即使配置了
[RouterKey]特性,仍然可以手动指定推送消息到某个队列中。
IMessagePublisher
IMessagePublisher 是 Maomi.MQ 的基础消息发布接口,Maomi.MQ 的消息发布接口就这么几个,由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以接口比较简单,开发者使用接口时可以灵活一些,使用难度也不大。
BasicProperties 是 RabbitMQ 中的消息基础属性对象,直接面向开发者,可以消息的发布和消费变得灵活和丰富功能,例如,可以通过 BasicProperties 配置单条消息的过期时间:
await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = i
}, (BasicProperties p) =>
{
p.Expiration = "1000";
});
Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Scoped:
services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
开发者也可以自行实现 IMessagePublisher 接口,实现自己的消息发布模型,具体示例请参考 DefaultMessagePublisher 类型。
原生通道
开发者可以通过 ConnectionPool 服务获取原生连接对象,直接在 IConnection 上使用 RabbitMQ 的接口发布消息:
private readonly ConnectionPool _connectionPool;
var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);
常驻内存连接对象
Maomi.MQ 通过 ConnectionPool 管理 RabbitMQ 连接对象,注入 ConnectionPool 服务后,通过 .Get() 接口获取全局默认连接实例。
如果开发者有自己的需求,也可以通过 .Create() 接口创建新的连接对象。
using var newConnectionObject = _connectionPool.Create();
using var newConnection = newConnectionObject.Connection;
using var newChannel = newConnection.CreateChannelAsync();
请务必妥善使用连接对象,不要频繁创建和释放,也不要忘记了管理生命周期,否则容易导致内存泄漏。
单个 IConnectionn 即可满足大多数场景下的使用,吞吐量足够用了,笔者经过了多次长时间的测试,发现一个 IConnection 即可满足需求,多个 IConnection 并不会带来任何优势,因此去掉了旧版本的连接池,现在默认全局只会存在一个 IConnection,但是不同的消费者使用 IChannel 来隔离。
程序只维持一个 IConnection 时,四个发布者同时发布消息,每秒速度如下:

如果消息内容非常大时,单个 IConnection 也足够应付,取决于带宽。
每条消息 478 KiB。

消息过期
IMessagePublisher 对外开放了 BasicProperties,开发者可以自由配置消息属性。
例如为消息配置过期时间:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 1; i++)
{
await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = i
}, properties =>
{
properties.Expiration = "6000";
});
}
return "ok";
}
为该消息设置过期时间后,如果队列绑定了死信队列,那么该消息长时间没有被消费时,会被移动到另一个队列,请参考 死信队列。
还可以通过配置消息属性实现更多的功能,请参考 IBasicProperties 文档。
事务
RabbitMQ 原生支持事务模型,RabbitMQ 的事务通讯协议可以参考 https://www.rabbitmq.com/docs/semantics
据 RabbitMQ 官方文档显示,事务模式会使吞吐量减少 250 倍,这个主要跟事务机制有关,事务模式不仅仅要保证消息已经推送到 Rabbit broker,还要保证 Rabbit broker 多节点分区同步,在 Rabbit broker 挂掉的情况下消息已被完整同步。不过一般可能用不上这么严格的模式,所以也可以使用下一小节提到的发送方确认机制。
Maomi.MQ 的事务接口使用上比较简单,可以使用扩展方法直接开启一个 ITransactionPublisher,事务接口使用上也比较简洁,示例如下:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = _messagePublisher.CreateTransaction();
await tranPublisher.TxSelectAsync();
try
{
await tranPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = 666
});
await Task.Delay(5000);
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
发送方确认模式
事务模式可以保证消息会被推送到 RabbitMQ 服务器中,并在个节点中已完成同步,但是由于事务模式会导致吞吐量降低 250 倍,因此 RabbitMQ 引入了一种确认机制,这种机制就像滑动窗口,能够保证消息推送到服务器中,并且具备高性能的特性,其吞吐量是事务模式 100 倍,参考资料:
https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms
https://www.rabbitmq.com/docs/confirms
不过 .NET RabbitMQClient 库的新版本已经去掉了一些 API 接口,改动信息详细参考:Issue #1682 、RabbitMQ tutorial - Reliable Publishing with Publisher Confirms
Maomi.MQ 根据新版本做了简化调整,具体用法是通过创建使用独立通道的消息发布者,然后在参数中指定 IChannel 属性。
using var confirmPublisher = _messagePublisher.CreateSingle(
new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true));
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = 666
});
}
事务模式和确认机制模式发布者是相互隔离的,创建这两者对象时都会默认自动使用新的 IChannel,因此不需要担心冲突。
如果开发者有自己的需求,也可以 CreateSingle 创建独立的 IChannel,然后自定义 CreateChannelOptions 属性,关于 CreateChannelOptions 的描述,请参考:
https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.CreateChannelOptions.html
强事务模式
强事务模式就是利用本地消息表,保证业务完成后,消息一定会发送成功。
例如,当用户支付订单完成后,需要发一条 MQ 消息,以便通知下游服务进行一系列处理,例如发送邮件通知、推送给物流服务、推送给审计中心等。支付逻辑自然是要做数据库事务,但是有个问题,在什么时候发送 MQ 消息?
如果在数据库事务里面加上发送 MQ 消息的代码,那么有可能消息已经发送了,但是服务挂了导致最后没有提交数据库事务,此时下游已经收到消息开始处理了,并且在数据库事务里面掺杂其它 IO 操作,可能导致数据库事务耗时延长,限制了并发能力。
如果在完成数据库事务后推送 MQ,那么有可能刚刚好在数据库事务提交后服务就挂了,下游没有收到通知,没有开始处理订单。
考虑到这种业务场景,所以 Maomi.MQ 增加了本地事务表模式,确保提交数据库事务后消息会发布成功。
根据数据库引入 nuget 包:
Maomi.MQ.Transaction.Mysql
Maomi.MQ.Transaction.Postgres
Maomi.MQ.Transaction.SqlServer
框架依赖两个表,你可以手动在数据库创建表,也可以让框架自动创建,sql 脚本文件:
https://github.com/whuanle/Maomi.MQ/tree/main/asserts
using Maomi.MQ.Transaction.Mysql;
using MySqlConnector;
builder.Services.AddMaomiMQTransactionMySql();
builder.Services.AddMaomiMQTransaction(options =>
{
options.ProviderName = TransactionProviderNames.MySql;
options.Connection = _ => new MySqlConnection(builder.Configuration.GetConnectionString("Default"));
options.AutoCreateTable = true;
});
发布者应在数据库内增加对 MQ 表的写入,并且在事务提交后再向 RabbitMQ 推送消息。
public async Task ExecuteWithOutboxAsync<TMessage>(
TMessage message,
Func<MySqlConnection, DbTransaction, CancellationToken, Task> businessAction,
CancellationToken cancellationToken)
where TMessage : class
{
await using var connection = new MySqlConnection(_connectionString);
await connection.OpenAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
await businessAction(connection, transaction, cancellationToken);
var outbox = await _outboxService.RegisterAutoAsync(
connection,
transaction,
message,
cancellationToken: cancellationToken);
await transaction.CommitAsync(cancellationToken);
await outbox.PublishAsync(cancellationToken);
}
上面的代码是最小可用示例,下面补充几种在实际项目里更常见的发布者使用方式,便于按业务场景直接套用。
委托模式
委托模式可以简化本地消息表的使用,自动创建事务、刷新本地消息表和发布消息。
await ExecuteWithOutboxAsync(
new OrderCreatedMessage { OrderNo = "SO-TX-2001", Amount = 888m },
async (conn, tran, ct) =>
{
// await SaveOrderAsync(conn, tran, order, ct);
await Task.CompletedTask;
},
cancellationToken);
EFCore 模式
还可以通过 IEfCoreTransactionService 注入 EFCore 服务,通过统一的上下文服务和实体,完成业务,降低了使用 ADO.NET 的复杂度。
public sealed class OrderEfAppService
{
private readonly IEfCoreTransactionService _efCoreTransactionService;
private readonly AppDbContext _dbContext;
public OrderEfAppService(IEfCoreTransactionService efCoreTransactionService, AppDbContext dbContext)
{
_efCoreTransactionService = efCoreTransactionService;
_dbContext = dbContext;
}
public async Task CreateOrderAsync(CancellationToken cancellationToken)
{
var outbox = await _efCoreTransactionService.ExecuteAndRegisterAsync(
_dbContext,
async (db, ct) =>
{
db.Set<OrderEntity>().Add(new OrderEntity
{
OrderNo = "SO-EF-3001",
Amount = 399m,
CreateTime = DateTimeOffset.UtcNow
});
await Task.CompletedTask;
},
exchange: "",
routingKey: "order.created",
message: new OrderCreatedMessage { OrderNo = "SO-EF-3001", Amount = 399m },
cancellationToken: cancellationToken);
await outbox.PublishAsync(cancellationToken);
}
}
不可路由消息
当发布消息时,如果该消息不可路由,即找不对应的队列等情况,那么将会触发 IBreakdown.BasicReturnAsync 接口,BasicReturnEventArgs 属性有详细的错误原因。


对于网络故障、RabbitMQ 服务挂了、没有对应交换器名称等失败等情况,则会在当前线程上出现异常,并且 TCP 连接会自动重新连接。
需要注意 RabbitMQ 的机制,推送消息并不是同步发生的,因此即使推送失败,也不会在当前线程中出现异常,所以不能判断当前消息是否成功推送。
对于不可路由的消息,Maomi.MQ 只提供了简单的接口通知,没有其它处理机制,所以开发者需要自行处理,社区中有一款 MQ 通讯框架叫 EasyNetQ,它的默认机制是自动创建新的队列,将当前不可路由的队列推送到新的队列中,以便持久化保存。
开发者可以实现该接口,然后注册为到容器:
services.AddScoped<IBreakdown, MyDefaultBreakdown>();
例如将不可路由的消息推送到新的队列中:
public class MyDefaultBreakdown : IBreakdown
{
private readonly ConnectionPool _connectionPool;
public MyDefaultBreakdown(ConnectionPool connectionPool)
{
_connectionPool = connectionPool;
}
/// <inheritdoc />
public async Task BasicReturnAsync(object sender, BasicReturnEventArgs @event)
{
var connectionObject = _connectionPool.Get();
await connectionObject.DefaultChannel.BasicPublishAsync<BasicProperties>(
@event.Exchange,
@event.RoutingKey + ".faild",
true,
new BasicProperties(@event.BasicProperties),
@event.Body);
}
/// <inheritdoc />
public Task NotFoundConsumerAsync(string queue, Type messageType, Type consumerType)
{
return Task.CompletedTask;
}
}
其实对于这种不可路由消息的情况,不单单只是转发存储,要检查是否误删队列、发布消息时队列名称是否一致等,最好在里面加上推送到飞书或钉钉群,以便在消息队列故障时及时处理问题。