我正在尝试使用MassTransit和.NET Core创建一个AzureServiceBus传奇。
我有一个ASP.NET核心应用程序,它成功地向队列发送消息。它在Startup.cs中具有以下配置:
services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.Host(
new Uri("https://zzz.servicebus.windows.net/"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 1;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
});
cfg.RequiresSession = true;
}));
EndpointConvention.Map<MassTransit.POC.Shared.IPurchasePolicyMessage>(
new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
控制器中发送消息的代码:
var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));
await
sendEndpoint.Send<IPurchasePolicyMessage>(
new
{
QuoteNumber = policyNumber,
CorrelationId = NewId.NextGuid().ToString("D")
}, context =>
{
context.SetSessionId(context.Message.CorrelationId.ToString());
}).ConfigureAwait(false);
我有一个单独的.NET核心控制台应用程序,它正在接收IPurchasePolicyMessage。它在Program.cs中具有以下配置:
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(
new Uri("https://zzz.servicebus.windows.net/"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 1;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
});
cfg.RequiresSession = true;
cfg.ReceiveEndpoint(host, "masstransitqueue", e =>
{
e.RequiresSession = true;
//e.Consumer<PurchasePolicyConsumer>();
e.Saga<PurchasePolicySaga>(new MessageSessionSagaRepository<PurchasePolicySaga>());
});
});
bus.Start();
PurchasePolicySaga被定义为:
public class PurchasePolicySaga :
ISaga,
InitiatedBy<IPurchasePolicyMessage>
{
public Guid CorrelationId { get; set; }
public async Task Consume(ConsumeContext<IPurchasePolicyMessage> context)
{
await Console.Out.WriteLineAsync($"Processing policy number {context.Message.QuoteNumber} in saga.");
}
}
这里的消费方法从未被调用过。没有错误,日志中没有与传入消息相关的活动,只是什么都没有发生。谁能告诉我怎么找出原因吗?
我强烈怀疑这个问题与会话有关,因为当我将我的传奇更改为一个简单的使用者,删除"RequiresSession“标志并删除队列以允许MassTransit重新创建它时,它就能工作了。然而,由于基于AzureServiceBus的MassTransit传奇需要会话,我有点卡住了。
进一步调查
查看Azure仪表板中的消息,我看到消息正在发送到队列中。这张截图显示了3条消息:
然而,没有关于这一主题的信息:
我应该期待在这个话题上有相应的信息,不是吗?另外,这里的订阅是否禁用了会话,这似乎不对?
发布于 2019-01-14 23:33:12
要使用的会话,您必须设置发送/发布的消息的SessionId,以便Azure为其创建一个会话。
await InputQueueSendEndpoint.Send(message, context =>
{
context.SetSessionId(message.CorrelationId.ToString());
});
您可以在这里看到一个工作单元测试示例:
https://stackoverflow.com/questions/54175789
复制相似问题