首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >消息处理程序未激活

消息处理程序未激活
EN

Stack Overflow用户
提问于 2016-05-11 13:21:13
回答 1查看 1.5K关注 0票数 2

我遇到了一些关于重新总线的问题。

这是我的设想。我们有三种服务

标识)发布的“IdentityCreated”消息

网关)将'UpdateProfileCommand‘直接发送到’profile-westeu-输入‘队列中。

配置文件)使用来自输入队列'profile-westeu- input‘的消息,并订阅'IdentityCreated’消息。

Profile Service中看到的rebus配置

既然我已经在温莎城堡注册了我的经纪人。

代码语言:javascript
运行
复制
container.Register(Classes.FromThisAssembly()
                  .BasedOn(typeof(IHandleMessages<>))
                  .WithServiceAllInterfaces()
                  .LifestyleTransient());

我配置了Rebus

代码语言:javascript
运行
复制
var bus = Configure.With(new CastleWindsorContainerAdapter(container))
            .Logging(x => x.Trace())
            .Transport(
                t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
                        inputQueueAddress: ProfileInputQueueName, mode: AzureServiceBusMode.Standard))
            .Options(o => o.SimpleRetryStrategy(ProfileErrorQueueName))
            .Start();

并订阅这样的消息类型

代码语言:javascript
运行
复制
bus.Subscribe(typeof(Nabufit.Messages.Identity.Events.IdentityCreated)).Wait()

我以为我的处理程序会被自动调用。然而,它没有:

我尝试过不同的解决方案

  • 更改输入队列的名称
  • 创建了一个事件发射器程序,该程序发布了一个类型为“IdentityCreated”的事件。当查看输入队列时,它是存在的,但是它不会被rebus捕获。

奖金信息:

  • 使用天蓝色服务总线
  • 在Service应用程序中托管Rebus
  • 我的输入队列名为“profile-westeu- input”。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-05-17 06:14:56

在研究了应用程序之后,我们发现我们在OwinCommunicationListener中的Webapi之间共享了Windsor容器,它具有一些自定义的依赖项的生命期配置。这导致了两个不同的错误。

  1. 由于容器配置,Rebus不拾取事件
  2. 从体系结构上讲,它并不是智能地与消费过程共享同一个容器。

最后,我们使用rebus提供的BuiltinHandlerActivation类构建了一个特定于总线消费过程的定制的BuiltinHandlerActivation。长得像这样。

代码语言:javascript
运行
复制
 public class ServiceBusCommunicationListener : ICommunicationListener
{
    private BuiltinHandlerActivator activator;

    public async Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        activator = new BuiltinHandlerActivator();
        RegisterHandlers(activator);

        var connectionString = "...";
        var bus = Configure.With(activator)
            .Logging(x => x.Serilog(Log.Logger))
            .Transport(
                t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
                        inputQueueAddress: "input", mode: AzureServiceBusMode.Standard))
            .Options(o => o.SimpleRetryStrategy("error"))
            .Start();

        return connectionString;
    }

    private void RegisterHandlers(BuiltinHandlerActivator builtinHandlerActivator)
    {
        (...)
    }

    public async Task CloseAsync(CancellationToken cancellationToken)
    {
        if (activator != null)
            activator.Dispose();
    }

    public void Abort()
    {
        if (activator != null)
            activator.Dispose();
    }
}

并将ServicebusCommunicationListner注册为ServiceInstanceListener。

代码语言:javascript
运行
复制
internal sealed class ProfileService : StatelessService
{
    public ProfileService(StatelessServiceContext context)
        : base(context)
    { }

    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        return new[]
        {
            new ServiceInstanceListener(context => new ServiceBusCommunicationListener()), 
        };
    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37163990

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档