我遇到了一些关于重新总线的问题。
这是我的设想。我们有三种服务
标识)发布的“IdentityCreated”消息
网关)将'UpdateProfileCommand‘直接发送到’profile-westeu-输入‘队列中。
配置文件)使用来自输入队列'profile-westeu- input‘的消息,并订阅'IdentityCreated’消息。
Profile Service中看到的rebus配置
既然我已经在温莎城堡注册了我的经纪人。
container.Register(Classes.FromThisAssembly()
.BasedOn(typeof(IHandleMessages<>))
.WithServiceAllInterfaces()
.LifestyleTransient());我配置了Rebus
var bus = Configure.With(new CastleWindsorContainerAdapter(container))
.Logging(x => x.Trace())
.Transport(
t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
inputQueueAddress: ProfileInputQueueName, mode: AzureServiceBusMode.Standard))
.Options(o => o.SimpleRetryStrategy(ProfileErrorQueueName))
.Start();并订阅这样的消息类型
bus.Subscribe(typeof(Nabufit.Messages.Identity.Events.IdentityCreated)).Wait()我以为我的处理程序会被自动调用。然而,它没有:
我尝试过不同的解决方案
奖金信息:
发布于 2016-05-17 06:14:56
在研究了应用程序之后,我们发现我们在OwinCommunicationListener中的Webapi之间共享了Windsor容器,它具有一些自定义的依赖项的生命期配置。这导致了两个不同的错误。
最后,我们使用rebus提供的BuiltinHandlerActivation类构建了一个特定于总线消费过程的定制的BuiltinHandlerActivation。长得像这样。
public class ServiceBusCommunicationListener : ICommunicationListener
{
private BuiltinHandlerActivator activator;
public async Task<string> OpenAsync(CancellationToken cancellationToken)
{
activator = new BuiltinHandlerActivator();
RegisterHandlers(activator);
var connectionString = "...";
var bus = Configure.With(activator)
.Logging(x => x.Serilog(Log.Logger))
.Transport(
t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
inputQueueAddress: "input", mode: AzureServiceBusMode.Standard))
.Options(o => o.SimpleRetryStrategy("error"))
.Start();
return connectionString;
}
private void RegisterHandlers(BuiltinHandlerActivator builtinHandlerActivator)
{
(...)
}
public async Task CloseAsync(CancellationToken cancellationToken)
{
if (activator != null)
activator.Dispose();
}
public void Abort()
{
if (activator != null)
activator.Dispose();
}
}并将ServicebusCommunicationListner注册为ServiceInstanceListener。
internal sealed class ProfileService : StatelessService
{
public ProfileService(StatelessServiceContext context)
: base(context)
{ }
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
return new[]
{
new ServiceInstanceListener(context => new ServiceBusCommunicationListener()),
};
}
}https://stackoverflow.com/questions/37163990
复制相似问题