我介绍了使用MassTransit和RabbitMQ的事件驱动架构,在本地主机上使用AWS,在活动环境中使用.NET 5 Web 。
我有一个API (#API-1),它生成事件,如等。
然后我有另一个API (#API-2),它存储实体的内存中副本。这个#API-2必须订阅这些事件才能更新其本地缓存。#API-2将有多个正在运行的实例,比方说3。
生产者配置(#API-1):
services.AddMassTransit(x =>
{
x.UsingRabbitMq();
});
services.AddMassTransitHostedService();
消费者配置(#API-2):
services.AddMassTransit(x =>
{
x.AddConsumer<EntityCreatedConsumer>();
x.AddConsumer<EntityUpdatedConsumer>();
x.AddConsumer<EntityDeletedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
发布于 2020-12-02 15:52:47
您需要使用不同的端点名称。每个端点都有自己的队列和绑定,所以如果使用相同的端点名称,就会得到一个队列,所有实例都开始竞争消息。
这是一个常见的错误,这就是为什么在文档中甚至是提到过。
发布于 2020-12-02 17:10:43
要正确地为使用者配置特定于实例的端点,可以在添加使用者时使用Endpoint
配置。我已经更新了你的代码来展示如何使用它。
string instanceId = "SomeUniqueValue";
services.AddMassTransit(x =>
{
x.AddConsumer<EntityCreatedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.AddConsumer<EntityUpdatedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.AddConsumer<EntityDeletedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
instanceId可以从环境中提取,甚至是随机的。如果希望在进程退出时删除队列,可以指定:
.Endpoint(e =>
{
e.InstanceId = instanceId;
e.Temporary = true;
});
这是在v7.0.4中添加的
https://stackoverflow.com/questions/65110978
复制相似问题