消费者

Maomi.MQ 的消费者很灵活,可以随时扩展出不同的工作模式,本身已经支持了普通消费者、事件总线编排、MediatR、FastEndpoints 等工作模式,能够以简单、灵活、方便的方式实现 RabbitMQ 消费。

因为 RabbitMQ 是交换器-路由键-队列三级模式,发布者只需要推送消息到交换器,而且交换器、路由键、队列的绑关系是由创建队列时配置的,所以 Maomi.MQ 简化了发布者模式,将主要配置都集中在消费者这里,通过消费者配置初始化路由、队列、消费逻辑等,对于死信队列、Qos 并发配置、广播模式等都需要在队列这里设置。

IConsumerOptions 是所有消费模式的配置抽象,其字段和功能说明如下。

// Consumer.
// 消费者配置.
public interface IConsumerOptions
{
    // Queue name.
    // 队列名称.
    string Queue { get; }

    // Bind the death message exchange.
    // 绑定死信交换器.
    string? DeadExchange { get; }

    // Bind the death message queue.
    // 绑定死信队列.
    string? DeadRoutingKey { get; }

    // Queue message expiration time, in millimeters.
    // 队列消息过期时间,单位毫秒.
    int Expiration { get; }

    // Qos,1-65535,缓冲大小.
    ushort Qos { get; }

    // Whether to return to the queue when the number of consumption failures reaches the condition.
    // 消费失败次数达到条件时,是否放回队列.
    bool RetryFaildRequeue { get; }

    // Create queues on startup,<see cref="RabbitMQ.Client.IChannel.QueueDeclareAsync"/>.
    // 是否自动创建队列.
    AutoQueueDeclare AutoQueueDeclare { get; }

    // Bind the exchange.
    // 绑定交换器.
    string? BindExchange { get; }

    // Exchange type.
    // 交换器类型.
    ExchangeType ExchangeType { get; }

    // Bind the routing key.
    // 绑定路由键.
    string? RoutingKey { get; }

    // Broadcast mode.
    // 广播模式.
    bool IsBroadcast { get; }
}

下面举例简单讲解内置的一些消费者模式,这些模式在后面的章节会单独详细说明。


普通消费者模式

消费者服务需要实现 IConsumer<TMessage> 接口,并且配置 [Consumer("queue")] 特性绑定队列名称,通过消费者对象来控制消费行为,消费者模式有具有失败通知和补偿能力,使用上也比较简单。


public class TestEvent
{
    public int Id { get; set; }
}

[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
    private static int _retryCount = 0;

    // 消费
    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        _retryCount++;
        Console.WriteLine($"执行次数:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }

    // 每次消费失败时执行
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
        => Task.CompletedTask;

    // 补偿
    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
        => Task.FromResult(ConsumerState.Ack);
}

所有消费模式,都带有 ExecuteAsync、FaildAsync、FallbackAsync 三个函数,这三个函数决定了消费行为和错误处理逻辑。

public interface IConsumer<TMessage>
    where TMessage : class
{
    // 当消息被正确反序列化后,处理接收到的消息
    // The received message is processed when it has been deserialized correctly.
    public Task ExecuteAsync(MessageHeader messageHeader, TMessage message);

    // 当 ExecuteAsync 出现异常后,立即执行
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage message);

    // 当所有重试均失败,或出现不能进入 ExecuteAsync 方法的异常
    // When all retries fail, or an exception occurs that the ExecuteAsync method cannot be accessed.
    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex);
}

事件模式

事件模式是可以编排事件执行顺序的,需要实现 IEventMiddleware<TMessage>IEventHandler<TMessage> 两个接口。

事件模式下,一个模型类不能被不同的事件使用,模型类即事件,一个事件一个模型。

public class TestEvent
{
	public string Message { get; set; }
}

``IEventHandler是事件的入口,你可以在里面控制整体的执行和错误处理,使用await next(@event, CancellationToken.None);` 开始链式执行事件。

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public TestEventMiddleware(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task ExecuteAsync(MessageHeader messageHeader, TMessage message, EventHandlerDelegate<TMessage> next)
    {
        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage? message)
    {
        return Task.CompletedTask;
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex)
    {
        return Task.FromResult(true);
    }
}


然后使用 [EventOrder] 特性编排事件执行顺序。

// 编排事件消费顺序
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
	public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
	{
	}

	public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
	{
		Console.WriteLine($"{@event.Id},事件 1 已被执行");
	}
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
	public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
	{
	}

	public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
	{
		Console.WriteLine($"{@event.Id},事件 2 已被执行");
	}
}

消费者模式和事件总线模式都可以应对大容量的消息,如下图所示,每个消息接近 500kb,多个队列并发拉取消费。

image-20240720221514504


如果消息内容不大,则可以达到很高的消费速度。

image-20240720212715583

动态消费者

动态消费者可以在运行期间动态订阅队列,并且支持消费者类型、事件总线类型、函数绑定三种方式

注入 IDynamicConsumer 即可使用动态消费者服务。

await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
{
	Qos = 10
});
// 自动事件模型对应消费者
await _dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
{
	Qos = 10
});
// 函数方式消费
_dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
{
	Qos = 10
}, async (header, message) =>
{
	Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
	await Task.CompletedTask;
});

消费者注册模式

Maomi.MQ 提供了 ITypeFilter 接口,开发者可以使用该接口实现自定义消费者注册模式。

Maomi.MQ 内置三个 ITypeFilter,分别是:

  • 消费者模式 ConsumerTypeFilter
  • 事件总线模式 EventBusTypeFilter
  • 自定义消费者模式 ConsumerTypeFilter

框架还带有 MediatR、FastEndpoints 的扩展能力,也是通过自定义的 ITypeFilter 接入的,感兴趣的读者可以自行实现其它消费模式。


框架默认注册 ConsumerTypeFilter、EventBusTypeFilter 两种模式,开发者可以自行调整决定使用哪种模式。

var consumerTypeFilter = new ConsumerTypeFilter();
// ...
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // ... ...
}, 
[typeof(Program).Assembly], 	// 要自动扫描的程序集
[new ConsumerTypeFilter(), new EventBusTypeFilter(), consumerTypeFilter]); 	// 配置要使用的消费者注册模式

广播模式

在广播模式下,每个消费者线程都会收到相同的消息,例如在 Kubernetes 中,一个服务被创建了 10 个实例,那么每个实例都会同样收到消息,服务实例启动或注销时,会自动关联和注销绑定消息队列,所以实例即使下线也不会导致服务器消息堆积。

举个场景,有个核心服务处于集群入口,为了提高并发能力,做了 数据库-Redis-内存缓存三级机制,但是内存缓存是在每个实例中的,需要有一种机制能够通知服务实例更新缓存,那么这个广播模式刚刚好,每个实例都可以收到消息。

在广播模式下,发送者方无需做任何处理,消费者只需要在特性注解加上 IsBroadcast = true 即可。

[Consumer("scenario.quickstart",IsBroadcast = true)]

多队列绑定

广播模式是用于将一条消息推送到交换器,然后绑定的多个队列都可以收到相同的消息,简单来说该模式是向交换器推送消息,然后交换器将消息转发到各个绑定的队列中,这样一来不同队列的消费者可以同时收到消息。

98ae0f8039f4b17a0c14048c82f1e631_post-21430-6555f746c77f1


但是不同的交换器模式使用上不一样,下面笔者 fanout 为例,当队列绑定到 fanout 类型的交换器后,Rabbit broker 会忽略 RoutingKey,将消息推送到所有绑定的队列中。

所以我们定义两个消费者,绑定到一个相同的 fanout 类型的交换器:

[Consumer("fanout_1", BindExchange = "fanouttest", ExchangeType = ExchangeType.Fanout)]
public class FanoutEvent_1_Consumer : IConsumer<FanoutEvent>
{
    // 消费
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
    {
        Console.WriteLine($"【fanout_1】,事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

[Consumer("fanout_2", BindExchange = "fanouttest", ExchangeType = "fanout")]
public class FanoutEvent_2_Consumer : IConsumer<FanoutEvent>
{
    // 消费
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
    {
        Console.WriteLine($"【fanout_2】,事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

image-20250208090419019


发布消息时,只需要配置交换器名称即可,两个消费者服务都会同时收到消息:

[HttpGet("publish_fanout")]
public async Task<string> Publisher_Fanout()
{
	for (var i = 0; i < 5; i++)
	{
		await _messagePublisher.PublishAsync(exchange: "fanouttest", routingKey: string.Empty, message: new FanoutEvent
		{
			Id = 666
		});
	}

	return "ok";
}

对于 Topic 类型的交换器和队列,使用方式也是一致的,定义两个消费者:

[Consumer("red.yellow.#", BindExchange = "topictest", ExchangeType = ExchangeType.Topic)]
public class TopicEvent_1_Consumer : IConsumer<TopicEvent>
{
    // 消费
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
    {
        Console.WriteLine($"【red.yellow.#】,事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

[Consumer("red.#", BindExchange = "topictest", ExchangeType = ExchangeType.Topic)]
public class TopicEvent_2_Consumer : IConsumer<TopicEvent>
{
    // 消费
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
    {
        Console.WriteLine($"【red.#】,事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

发布消息:

[HttpGet("publish_topic")]
public async Task<string> Publisher_Topic()
{
	for (var i = 0; i < 5; i++)
	{
		await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.a", message: new TopicEvent
		{
			Id = 666
		});
		await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.yellow.a", message: new TopicEvent
		{
			Id = 666
		});
	}

	return "ok";
}