首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >MassTransit使用AzureServiceBus无声失败

MassTransit使用AzureServiceBus无声失败
EN

Stack Overflow用户
提问于 2019-01-14 04:28:00
回答 1查看 622关注 0票数 1

我正在尝试使用MassTransit和.NET Core创建一个AzureServiceBus传奇。

我有一个ASP.NET核心应用程序,它成功地向队列发送消息。它在Startup.cs中具有以下配置:

代码语言:javascript
运行
复制
services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
    cfg.Host(
        new Uri("https://zzz.servicebus.windows.net/"),
        h =>
        {
            h.TransportType = TransportType.AmqpWebSockets;
            h.OperationTimeout = TimeSpan.FromSeconds(5);
            h.RetryLimit = 1;
            h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
        });
        
    cfg.RequiresSession = true;
}));

EndpointConvention.Map<MassTransit.POC.Shared.IPurchasePolicyMessage>(
    new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));

services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());

控制器中发送消息的代码:

代码语言:javascript
运行
复制
var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));

await
  sendEndpoint.Send<IPurchasePolicyMessage>(
    new
    {
      QuoteNumber = policyNumber,
      CorrelationId = NewId.NextGuid().ToString("D")
    }, context =>
    {
      context.SetSessionId(context.Message.CorrelationId.ToString());
    }).ConfigureAwait(false);

我有一个单独的.NET核心控制台应用程序,它正在接收IPurchasePolicyMessage。它在Program.cs中具有以下配置:

代码语言:javascript
运行
复制
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
    var host = cfg.Host(
        new Uri("https://zzz.servicebus.windows.net/"),
        h =>
        {
            h.TransportType = TransportType.AmqpWebSockets;
            h.OperationTimeout = TimeSpan.FromSeconds(5);
            h.RetryLimit = 1;
            h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
        });

    cfg.RequiresSession = true;

    cfg.ReceiveEndpoint(host, "masstransitqueue", e =>
    {
        e.RequiresSession = true;
        //e.Consumer<PurchasePolicyConsumer>();
        e.Saga<PurchasePolicySaga>(new MessageSessionSagaRepository<PurchasePolicySaga>());
    });
});

bus.Start();

PurchasePolicySaga被定义为:

代码语言:javascript
运行
复制
public class PurchasePolicySaga :
    ISaga,
    InitiatedBy<IPurchasePolicyMessage>
{
    public Guid CorrelationId { get; set; }

    public async Task Consume(ConsumeContext<IPurchasePolicyMessage> context)
    {
        await Console.Out.WriteLineAsync($"Processing policy number {context.Message.QuoteNumber} in saga.");
    }
}

这里的消费方法从未被调用过。没有错误,日志中没有与传入消息相关的活动,只是什么都没有发生。谁能告诉我怎么找出原因吗?

我强烈怀疑这个问题与会话有关,因为当我将我的传奇更改为一个简单的使用者,删除"RequiresSession“标志并删除队列以允许MassTransit重新创建它时,它就能工作了。然而,由于基于AzureServiceBus的MassTransit传奇需要会话,我有点卡住了。

进一步调查

查看Azure仪表板中的消息,我看到消息正在发送到队列中。这张截图显示了3条消息:

然而,没有关于这一主题的信息:

我应该期待在这个话题上有相应的信息,不是吗?另外,这里的订阅是否禁用了会话,这似乎不对?

EN

回答 1

Stack Overflow用户

发布于 2019-01-14 23:33:12

要使用的会话,您必须设置发送/发布的消息的SessionId,以便Azure为其创建一个会话。

代码语言:javascript
运行
复制
await InputQueueSendEndpoint.Send(message, context =>
{
    context.SetSessionId(message.CorrelationId.ToString());
});

您可以在这里看到一个工作单元测试示例:

Specs.cs

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54175789

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档