首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Service Fabric Actor订阅Azure Service Bus主题

Service Fabric Actor订阅Azure Service Bus主题
EN

Stack Overflow用户
提问于 2018-03-18 12:35:13
回答 3查看 601关注 0票数 1

我正在考虑构建一个系统,该系统要求Actors使用特定于Actor实例的过滤器创建对Azure服务总线主题的订阅。我的问题是,如果Actor (该主题的订阅)已在Service中被停用,它会被Azure服务总线发送的新消息激活(重新激活)吗?

谢谢

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-03-18 14:04:24

您的演员将不会被激活通过收到一条消息。它只通过远程调用和提醒来激活。所以这种方法行不通。

您可以做的是在服务中接收消息,并将它们转发给Actor实例。如果需要,调用Actor将动态创建实例。

票数 3
EN

Stack Overflow用户

发布于 2018-03-18 14:06:00

基于演员生命周期,它必须被激活。来自主题的Azure服务总线消息不会激活参与者。相反,你需要一个能做到这一点的主管程序。消息可以包含一个属性来表示所需的参与者ID。它还允许通过具有单个主题和扩展监控器来简化Azure服务总线拓扑。

票数 1
EN

Stack Overflow用户

发布于 2018-10-08 01:24:46

这可以很容易地通过提醒来实现。因为需要先叫演员,所以你可以这样做。

create方法将设置连接字符串、主题名称、订阅名称,并在需要时创建它们。提醒将检查订阅客户端是否为null,以及是否将创建该客户端。提示总是在失败时执行,这样您就可以控制故障并在挤压时重新启动它。

https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md

代码语言:javascript
运行
复制
public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
    {
        if (options?.ConnectionString == null)
        {
            return false;
        }
        await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);

        var client = new ManagementClient(options.ConnectionString);
        try
        {
            var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
            if (!exist)
            {
               await client.CreateTopicAsync(options.TopicName, cancellationToken);
            }
            exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            if (!exist)
            {
                await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            }
            var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
            if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
            {
                SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
                await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
            }

        }
        catch (Exception ex)
        {
            ActorEventSource.Current.ActorMessage(this, ex.Message);                
        }
        return true;
    }
    public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
    {
        var client = new ManagementClient(options.ConnectionString);
        try
        {
            await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
            await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
        }
        catch (Exception ex)
        {
            ActorEventSource.Current.ActorMessage(this, ex.Message);
        }

    }
    private ISubscriptionClient subscriptionClient;       
    public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
    {
        var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
        if (!options.HasValue)
        {
            ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
            return false;
        }


        var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);

        var msg = new Message(message.Body);
        if(message.UserProperties != null)
        {
            foreach (var item in message.UserProperties)
            {
                msg.UserProperties.Add(item);
            }
        }
        msg.Label = message.Label;



       await client.SendAsync(msg);
       await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);

        return true;
    }
    void RegisterOnMessageHandlerAndReceiveMessages()
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {                
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }
    async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
    {
        ActorEventSource.Current.ActorMessage(this, message.Label);

        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);


    }
    Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {

        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        ActorEventSource.Current.ActorMessage(this,
            string.Format("Exception context for troubleshooting: - Endpoint: {0}- Entity Path: {1}- Executing Action: {2} - MEssage: {3}",
            context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
        return Task.CompletedTask;
    }
    protected override async Task OnActivateAsync()
    {
        ActorEventSource.Current.ActorMessage(this, $"Actor '{Id.GetStringId()}' activated.");

        IActorReminder Recieve_Message = await this.RegisterReminderAsync(
                        "Recieve_Message",
                        null,
                        TimeSpan.FromSeconds(1),    //The amount of time to delay before firing the reminder
                        TimeSpan.FromSeconds(1));


    }
    public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
    {
        if (reminderName.Equals("Recieve_Message"))
        {
            if(subscriptionClient == null)
            {
                var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
                if (!options.HasValue)
                {
                    ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
                    return;
                }

                var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);

                subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);

                RegisterOnMessageHandlerAndReceiveMessages();
            }

        }


    }        
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49347941

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档