自定义消费者和动态订阅

主要实现了两部分的功能。

  • 在程序启动时,可以自定义消费者配置和消费者模型,不需要使用特性注解配置。
  • 在程序启动后,可以随时启动一个消费者或者停止一个消费者。

参考示例项目:https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb


自定义消费者

消费者可以不使用特性注解,只需要实现 IConsumer<TEvent> 即可,扫描程序集时会忽略掉没有添加特性注解的消费者。

定义消费者模型:

public class DynamicCustomConsumer : IConsumer<TestEvent>
{
    public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        throw new NotImplementedException();
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
    {
        throw new NotImplementedException();
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
    {
        throw new NotImplementedException();
    }
}

然后通过 DynamicConsumerTypeFilter 手动配置消费者和属性。

DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();

dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
	Queue = "test1"
});
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
	Queue = "test2"
});

然后注入服务时,手动添加类型过滤器。


builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
        // ... ...
	};
}, [typeof(Program).Assembly], [
    new ConsumerTypeFilter(),  // 消费者类型过滤器
    new EventBusTypeFilter(),  // 事件总线类型过滤器
    dynamicConsumerTypeFilter  // 动态消费者过滤器
]);

动态订阅

在程序启动后,通过 IDynamicConsumer 服务可以动态启动或停止一个消费者。对于在程序启动时就已经运行的消费者,不会受到动态订阅控制,不能在程序运行时停止。


动态启动消费者:

private readonly IMessagePublisher _messagePublisher;
private readonly IDynamicConsumer _dynamicConsumer;

[HttpPost("create")]
public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
{
	foreach (var item in consumer.Queues)
	{
		await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
	}

	return "ok";
}

如果不想定义模型类,也可以直接使用函数方式:

foreach (var item in consumer.Queues)
{
	var consumerTag = await _dynamicConsumer.ConsumerAsync<TestEvent>(
		consumerOptions: new ConsumerOptions(item),
		execute: async (header, message) =>
		{
			await Task.CompletedTask;
		},
		faild: async (header, ex, retryCount, message) => { },
		fallback: async (header, message, ex) => ConsumerState.Ack
		);
}

return "ok";

使用队列名称可以动态停止消费者:

[HttpPost("stop")]
public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
{
	foreach (string queueName in consumer.Queues)
	{
		await _dynamicConsumer.StopConsumerAsync(queueName);
	}

	return "ok";
}

也可以使用消费者标识:

var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
await _dynamicConsumer.StopConsumerTagAsync(consumerTag);