我有一个web应用程序,它在Masstransit中使用请求/响应消息。这在我们的测试环境中工作,没有问题。
然而,在客户部署上,我们面临一个问题。在客户站点,我们有两个网段A和B。执行数据库调用的组件在网段A中,web应用程序和RabbitMq服务器在网段B中。
由于安全限制,段A中的组件必须通过具有给定地址的负载均衡器。组件本身可以通过Masstransit连接到RabbitMQ。到目前一切尚好。
但是,网段B上的web组件使用RabbitMq服务器的直接地址。当web组件现在开始请求/响应调用时,我可以看到消息到达了段A中的组件。但是,我看到使用者试图在“错误的”地址上调用RabbitMQ服务器。它使用web组件用来发出请求的地址。但是,段A中的组件应回复“负载均衡器”地址。
有没有办法配置或告诉RespondAsync调用使用为该组件配置的连接地址?
当然,最简单的方法是让web组件也通过负载均衡器连接,但由于网段/安全设置的原因,负载均衡器只能从网段A到达。
如有任何意见或帮助,我们将非常感谢。
发布于 2017-06-13 22:11:29
我在rabbitmq联合中也遇到过类似的问题。这是我所做的。
ResponseAddressSendObserver
class ResponseAddressSendObserver : ISendObserver
{
private readonly string _hostUriString;
public ResponseAddressSendObserver(string hostUriString)
{
_hostUriString = hostUriString;
}
public Task PreSend<T>(SendContext<T> context)
where T : class
{
if (context.ResponseAddress != null)
{
// Send relative response address alongside the message
context.Headers.Set("RelativeResponseAddress",
context.ResponseAddress.AbsoluteUri.Substring(_hostUriString.Length));
}
return Task.CompletedTask;
}
...
}
ResponseAddressConsumeFilter
class ResponseAddressConsumeFilter : IFilter<ConsumeContext>
{
private readonly string _hostUriString;
public ResponseAddressConsumeFilter(string hostUriString)
{
_hostUriString = hostUriString;
}
public Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
{
var responseAddressOverride = GetResponseAddress(_hostUriString, context);
return next.Send(new ResponseAddressConsumeContext(responseAddressOverride, context));
}
public void Probe(ProbeContext context){}
private static Uri GetResponseAddress(string host, ConsumeContext context)
{
if (context.ResponseAddress == null)
return context.ResponseAddress;
object relativeResponseAddress;
if (!context.Headers.TryGetHeader("RelativeResponseAddress", out relativeResponseAddress) || !(relativeResponseAddress is string))
throw new InvalidOperationException("Message has ResponseAddress but doen't have RelativeResponseAddress header");
return new Uri(host + relativeResponseAddress);
}
}
ResponseAddressConsumeContext
class ResponseAddressConsumeContext : BaseConsumeContext
{
private readonly ConsumeContext _context;
public ResponseAddressConsumeContext(Uri responseAddressOverride, ConsumeContext context)
: base(context.ReceiveContext)
{
_context = context;
ResponseAddress = responseAddressOverride;
}
public override Uri ResponseAddress { get; }
public override bool TryGetMessage<T>(out ConsumeContext<T> consumeContext)
{
ConsumeContext<T> context;
if (_context.TryGetMessage(out context))
{
// the most hackish part in the whole arrangement
consumeContext = new MessageConsumeContext<T>(this, context.Message);
return true;
}
else
{
consumeContext = null;
return false;
}
}
// all other members just delegate to _context
}
当配置总线时
var result = MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(hostAddress), h =>
{
h.Username(...);
h.Password(...);
});
cfg.UseFilter(new ResponseAddressConsumeFilter(hostAddress));
...
});
result.ConnectSendObserver(new ResponseAddressSendObserver(hostAddress));
因此,现在相对响应地址与消息一起发送,并在接收端使用。
本文档不建议使用观察者来修改任何内容,但在这种情况下应该可以。
也许三个是更好的解决方案,但我还没有找到一个。HTH
https://stackoverflow.com/questions/44445890
复制相似问题