支持 MediatR / FastEndpoints
Maomi.MQ 支持通过 MediatR 和 FastEndpoints 两种应用层框架接入消息队列。
你可以继续使用熟悉的请求/命令/事件模型,同时由 Maomi.MQ 完成 RabbitMQ 的投递与消费。
包与版本
建议按下面方式安装:
dotnet add package Maomi.MQ.RabbitMQ
dotnet add package Maomi.MQ.MediatR
dotnet add package Maomi.MQ.FastEndpoints
说明:
- MediatR 扩展库项目名为
Maomi.MQ.MediatR。 - FastEndpoints 扩展库项目名为
Maomi.MQ.FastEndpoints。 - 这两个扩展都依赖
Maomi.MQ.RabbitMQ。
MediatR 接入
服务注册
MediatRTypeFilter 用于扫描带 [MediatRConsumer] 的消息类型,并自动注册对应消费者。
using Maomi.MQ;
using Maomi.MQ.Consumer;
using Maomi.MQ.EventBus;
using Maomi.MQ.MediatR;
using Maomi.MQ.Models;
using MediatR;
using System.Reflection;
builder.Services.AddMediatR(options =>
{
options.RegisterServicesFromAssemblies(
[
Assembly.GetExecutingAssembly(),
typeof(MediatRTypeFilter).Assembly
]);
// 需要支持开放泛型处理器(MediatRMqCommandHandler<> 等)
options.RegisterGenericHandlers = true;
});
builder.Services.AddMaomiMQ(
(MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = rabbit =>
{
rabbit.HostName = Environment.GetEnvironmentVariable("RABBITMQ") ?? "127.0.0.1";
rabbit.Port = 5672;
rabbit.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(), new EventBusTypeFilter(), new MediatRTypeFilter()]);
定义消息和处理器
IRequest + [MediatRConsumer] 即可声明一个 MQ 消费类型:
using Maomi.MQ.MediatR;
using MediatR;
[MediatRConsumer("mediatr.order.created", Qos = 1)]
public sealed class OrderCreatedCommand : IRequest
{
public string OrderNo { get; set; } = string.Empty;
}
public sealed class OrderCreatedCommandHandler : IRequestHandler<OrderCreatedCommand>
{
public Task Handle(OrderCreatedCommand request, CancellationToken cancellationToken)
{
Console.WriteLine($"received: {request.OrderNo}");
return Task.CompletedTask;
}
}
发布消息
推荐通过 MediatR 发包装命令,保持调用风格统一:
using Maomi.MQ.MediatR;
using MediatR;
public sealed class PublishService
{
private readonly IMediator _mediator;
public PublishService(IMediator mediator)
{
_mediator = mediator;
}
public Task SendAsync(string orderNo, CancellationToken ct)
{
return _mediator.Send(new MediatRMqCommand<OrderCreatedCommand>
{
Message = new OrderCreatedCommand { OrderNo = orderNo }
}, ct);
}
}
如需自定义 exchange/routingKey,可以使用 MediatRMqCustomCommand<T>:
await _mediator.Send(new MediatRMqCustomCommand<OrderCreatedCommand>
{
Exchange = "app.orders",
RoutingKey = "orders.created",
Message = new OrderCreatedCommand { OrderNo = "SO-1001" }
}, ct);
FastEndpoints 接入
服务注册
FastEndpointsTypeFilter 用于扫描带 [FastEndpointsConsumer] 的 IEvent/ICommand 类型。
using FastEndpoints;
using Maomi.MQ;
using Maomi.MQ.Consumer;
using Maomi.MQ.EventBus;
using Maomi.MQ.Models;
using System.Reflection;
builder.Services.AddFastEndpoints(options =>
{
options.Assemblies =
[
Assembly.GetExecutingAssembly(),
typeof(FastEndpointsTypeFilter).Assembly
];
});
builder.Services.AddMaomiMQ(
(MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = rabbit =>
{
rabbit.HostName = Environment.GetEnvironmentVariable("RABBITMQ") ?? "127.0.0.1";
rabbit.Port = 5672;
rabbit.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(), new EventBusTypeFilter(), new FastEndpointsTypeFilter()]);
app.UseFastEndpoints();
定义消息和处理器
using FastEndpoints;
using Maomi.MQ;
[FastEndpointsConsumer("fe.order.event", Qos = 1)]
public sealed class OrderCreatedEvent : IEvent
{
public string OrderNo { get; set; } = string.Empty;
}
public sealed class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
{
public Task HandleAsync(OrderCreatedEvent eventModel, CancellationToken ct)
{
Console.WriteLine($"event received: {eventModel.OrderNo}");
return Task.CompletedTask;
}
}
[FastEndpointsConsumer("fe.order.command", Qos = 1)]
public sealed class CreateOrderCommand : ICommand
{
public string OrderNo { get; set; } = string.Empty;
}
public sealed class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
public Task ExecuteAsync(CreateOrderCommand command, CancellationToken ct)
{
Console.WriteLine($"command received: {command.OrderNo}");
return Task.CompletedTask;
}
}
发布消息
可使用框架原生调用方式,通过包装模型转发到 MQ:
await new FastEndpointsMqEvent<OrderCreatedEvent>
{
Event = new OrderCreatedEvent { OrderNo = "SO-2001" }
}.PublishAsync(waitMode: Mode.WaitForAll, ct);
await new FastEndpointsMqCommand<CreateOrderCommand>
{
Command = new CreateOrderCommand { OrderNo = "SO-2002" }
}.ExecuteAsync(ct);
过滤与中间件
MediatRTypeFilter 和 FastEndpointsTypeFilter 都支持:
ConsumerInterceptor:注册前修改或拒绝某些消费者。- 自定义
IEventMiddleware<>:在执行处理器前后加统一逻辑。
消费者拦截
示例:跳过队列名包含 local 的消费者。
using Maomi.MQ.Consumer;
ConsumerInterceptor interceptor = (options, type) =>
{
if (options.Queue.Contains("local", StringComparison.OrdinalIgnoreCase))
{
return new RegisterQueue(false, options);
}
return new RegisterQueue(true, options);
};
var filter = new MediatRTypeFilter(interceptor);
示例:统一给队列加前缀。
using Maomi.MQ.Consumer;
ConsumerInterceptor interceptor = (options, type) =>
{
ConsumerOptions newOptions = new(options.Queue);
newOptions.CopyFrom(options);
newOptions.Queue = $"prod_{options.Queue}";
return new RegisterQueue(true, newOptions);
};
自定义事件中间件
示例:消息超过 10 小时直接跳过处理。
using Maomi.MQ;
using Maomi.MQ.EventBus;
public sealed class FreshnessEventMiddleware<TMessage> : IEventMiddleware<TMessage>
where TMessage : class
{
public Task ExecuteAsync(MessageHeader messageHeader, TMessage message, EventHandlerDelegate<TMessage> next)
{
if (DateTimeOffset.UtcNow - messageHeader.Timestamp > TimeSpan.FromHours(10))
{
// 不抛异常,直接返回,等同于 ACK
return Task.CompletedTask;
}
return next(messageHeader, message, CancellationToken.None);
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage? message)
=> Task.CompletedTask;
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex)
=> Task.FromResult(ConsumerState.Ack);
}
注册方式(MediatR 示例):
new MediatRTypeFilter(interceptor, typeof(FreshnessEventMiddleware<>))
更多消费者配置选项,可参考:配置与调试。