支持 MediatR / FastEndpoints

Maomi.MQ 支持通过 MediatRFastEndpoints 两种应用层框架接入消息队列。
你可以继续使用熟悉的请求/命令/事件模型,同时由 Maomi.MQ 完成 RabbitMQ 的投递与消费。

包与版本

建议按下面方式安装:

dotnet add package Maomi.MQ.RabbitMQ
dotnet add package Maomi.MQ.MediatR
dotnet add package Maomi.MQ.FastEndpoints

说明:

  • MediatR 扩展库项目名为 Maomi.MQ.MediatR
  • FastEndpoints 扩展库项目名为 Maomi.MQ.FastEndpoints
  • 这两个扩展都依赖 Maomi.MQ.RabbitMQ

MediatR 接入

服务注册

MediatRTypeFilter 用于扫描带 [MediatRConsumer] 的消息类型,并自动注册对应消费者。

using Maomi.MQ;
using Maomi.MQ.Consumer;
using Maomi.MQ.EventBus;
using Maomi.MQ.MediatR;
using Maomi.MQ.Models;
using MediatR;
using System.Reflection;

builder.Services.AddMediatR(options =>
{
    options.RegisterServicesFromAssemblies(
    [
        Assembly.GetExecutingAssembly(),
        typeof(MediatRTypeFilter).Assembly
    ]);

    // 需要支持开放泛型处理器(MediatRMqCommandHandler<> 等)
    options.RegisterGenericHandlers = true;
});

builder.Services.AddMaomiMQ(
    (MqOptionsBuilder options) =>
    {
        options.WorkId = 1;
        options.AppName = "myapp";
        options.Rabbit = rabbit =>
        {
            rabbit.HostName = Environment.GetEnvironmentVariable("RABBITMQ") ?? "127.0.0.1";
            rabbit.Port = 5672;
            rabbit.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
        };
    },
    [typeof(Program).Assembly],
    [new ConsumerTypeFilter(), new EventBusTypeFilter(), new MediatRTypeFilter()]);

定义消息和处理器

IRequest + [MediatRConsumer] 即可声明一个 MQ 消费类型:

using Maomi.MQ.MediatR;
using MediatR;

[MediatRConsumer("mediatr.order.created", Qos = 1)]
public sealed class OrderCreatedCommand : IRequest
{
    public string OrderNo { get; set; } = string.Empty;
}

public sealed class OrderCreatedCommandHandler : IRequestHandler<OrderCreatedCommand>
{
    public Task Handle(OrderCreatedCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"received: {request.OrderNo}");
        return Task.CompletedTask;
    }
}

发布消息

推荐通过 MediatR 发包装命令,保持调用风格统一:

using Maomi.MQ.MediatR;
using MediatR;

public sealed class PublishService
{
    private readonly IMediator _mediator;

    public PublishService(IMediator mediator)
    {
        _mediator = mediator;
    }

    public Task SendAsync(string orderNo, CancellationToken ct)
    {
        return _mediator.Send(new MediatRMqCommand<OrderCreatedCommand>
        {
            Message = new OrderCreatedCommand { OrderNo = orderNo }
        }, ct);
    }
}

如需自定义 exchange/routingKey,可以使用 MediatRMqCustomCommand<T>

await _mediator.Send(new MediatRMqCustomCommand<OrderCreatedCommand>
{
    Exchange = "app.orders",
    RoutingKey = "orders.created",
    Message = new OrderCreatedCommand { OrderNo = "SO-1001" }
}, ct);

FastEndpoints 接入

服务注册

FastEndpointsTypeFilter 用于扫描带 [FastEndpointsConsumer]IEvent/ICommand 类型。

using FastEndpoints;
using Maomi.MQ;
using Maomi.MQ.Consumer;
using Maomi.MQ.EventBus;
using Maomi.MQ.Models;
using System.Reflection;

builder.Services.AddFastEndpoints(options =>
{
    options.Assemblies =
    [
        Assembly.GetExecutingAssembly(),
        typeof(FastEndpointsTypeFilter).Assembly
    ];
});

builder.Services.AddMaomiMQ(
    (MqOptionsBuilder options) =>
    {
        options.WorkId = 1;
        options.AppName = "myapp";
        options.Rabbit = rabbit =>
        {
            rabbit.HostName = Environment.GetEnvironmentVariable("RABBITMQ") ?? "127.0.0.1";
            rabbit.Port = 5672;
            rabbit.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
        };
    },
    [typeof(Program).Assembly],
    [new ConsumerTypeFilter(), new EventBusTypeFilter(), new FastEndpointsTypeFilter()]);

app.UseFastEndpoints();

定义消息和处理器

using FastEndpoints;
using Maomi.MQ;

[FastEndpointsConsumer("fe.order.event", Qos = 1)]
public sealed class OrderCreatedEvent : IEvent
{
    public string OrderNo { get; set; } = string.Empty;
}

public sealed class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
{
    public Task HandleAsync(OrderCreatedEvent eventModel, CancellationToken ct)
    {
        Console.WriteLine($"event received: {eventModel.OrderNo}");
        return Task.CompletedTask;
    }
}

[FastEndpointsConsumer("fe.order.command", Qos = 1)]
public sealed class CreateOrderCommand : ICommand
{
    public string OrderNo { get; set; } = string.Empty;
}

public sealed class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
    public Task ExecuteAsync(CreateOrderCommand command, CancellationToken ct)
    {
        Console.WriteLine($"command received: {command.OrderNo}");
        return Task.CompletedTask;
    }
}

发布消息

可使用框架原生调用方式,通过包装模型转发到 MQ:

await new FastEndpointsMqEvent<OrderCreatedEvent>
{
    Event = new OrderCreatedEvent { OrderNo = "SO-2001" }
}.PublishAsync(waitMode: Mode.WaitForAll, ct);

await new FastEndpointsMqCommand<CreateOrderCommand>
{
    Command = new CreateOrderCommand { OrderNo = "SO-2002" }
}.ExecuteAsync(ct);

过滤与中间件

MediatRTypeFilterFastEndpointsTypeFilter 都支持:

  • ConsumerInterceptor:注册前修改或拒绝某些消费者。
  • 自定义 IEventMiddleware<>:在执行处理器前后加统一逻辑。

消费者拦截

示例:跳过队列名包含 local 的消费者。

using Maomi.MQ.Consumer;

ConsumerInterceptor interceptor = (options, type) =>
{
    if (options.Queue.Contains("local", StringComparison.OrdinalIgnoreCase))
    {
        return new RegisterQueue(false, options);
    }

    return new RegisterQueue(true, options);
};

var filter = new MediatRTypeFilter(interceptor);

示例:统一给队列加前缀。

using Maomi.MQ.Consumer;

ConsumerInterceptor interceptor = (options, type) =>
{
    ConsumerOptions newOptions = new(options.Queue);
    newOptions.CopyFrom(options);
    newOptions.Queue = $"prod_{options.Queue}";
    return new RegisterQueue(true, newOptions);
};

自定义事件中间件

示例:消息超过 10 小时直接跳过处理。

using Maomi.MQ;
using Maomi.MQ.EventBus;

public sealed class FreshnessEventMiddleware<TMessage> : IEventMiddleware<TMessage>
    where TMessage : class
{
    public Task ExecuteAsync(MessageHeader messageHeader, TMessage message, EventHandlerDelegate<TMessage> next)
    {
        if (DateTimeOffset.UtcNow - messageHeader.Timestamp > TimeSpan.FromHours(10))
        {
            // 不抛异常,直接返回,等同于 ACK
            return Task.CompletedTask;
        }

        return next(messageHeader, message, CancellationToken.None);
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage? message)
        => Task.CompletedTask;

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex)
        => Task.FromResult(ConsumerState.Ack);
}

注册方式(MediatR 示例):

new MediatRTypeFilter(interceptor, typeof(FreshnessEventMiddleware<>))

更多消费者配置选项,可参考:配置与调试