首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >从Azure事件中心到Azure服务总线的管道事件

从Azure事件中心到Azure服务总线的管道事件
EN

Stack Overflow用户
提问于 2020-05-30 18:21:10
回答 2查看 951关注 0票数 0

我正在收听各种活动的事件中心。

  1. 每个事件都是高值的,不能错过。
  2. 事件是基于设备id进行分区的。来自一个设备id的
  3. 事件是稀疏的,而且不太频繁(每几个dasy就有几个事件)。它只在响应用户操作时才发生,而用户操作并不频繁。
  4. 设备的数量很大,所以对于各种设备I,会有很多事件。

对于每个事件,我需要对系统进行3-4个API调用,这些调用并不是非常可靠的。由于其中一些是交叉的Geo调用,这可能需要一些时间。

我计划从事件中心接收事件,并将它们放到服务总线上。我的理由如下。

可以将blocked.

  • Service事件集线器缩放到32个分区,如果一个事件需要时间,则整个分区将获得总线,而后者则更具有水平可伸缩性。如果吞吐量下降,我只需将更多的订阅者添加到服务总线.

我一直在寻找这样的模式,但我还没有看到从基于日志的消息系统中获取数据并将它们推送到基于队列的模式的模式。

是否有更好的方法来处理这种情况?

EN

回答 2

Stack Overflow用户

发布于 2020-06-01 07:39:16

我认为您可以使用事件集线器触发器和服务总线输出绑定来实现您想要的结果。

例如,我希望监视事件中心'test‘,并且我使用的是C#库:

代码语言:javascript
运行
复制
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace FunctionApp68
{
    public static class Function1
    {
        [FunctionName("Function1")]
        [return: ServiceBus("test1", Connection = "ServiceBusConnection")]
        public static string Run([EventHubTrigger("test", Connection = "str")] EventData[] events, ILogger log)
        {
            var exceptions = new List<Exception>();
            string messageBodyt = "";
            foreach (EventData eventData in events)
            {
                try
                {
                    string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
                    messageBodyt = messageBodyt + messageBody;
                    // Replace these two lines with your processing logic.
                    log.LogInformation($"C# Event Hub trigger function processed a message: {messageBody}");
                    //await Task.Yield();
                }
                catch (Exception e)
                {
                    // We need to keep processing the rest of the batch - capture this exception and continue.
                    // Also, consider capturing details of the message that failed processing so it can be processed again later.
                    exceptions.Add(e);
                }
            }

            // Once processing of the batch is complete, if any messages in the batch failed processing throw an exception so that there is a record of the failure.

            if (exceptions.Count > 1)
                throw new AggregateException(exceptions);

            if (exceptions.Count == 1)
                throw exceptions.Single();
            return messageBodyt;
        }
    }
}

上述代码将从事件集线器'test‘收集,并保存到服务总线队列’test 1‘。

看看这些医生:

https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-hubs-trigger?tabs=csharp

https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-output?tabs=csharp#example

票数 0
EN

Stack Overflow用户

发布于 2020-06-09 05:40:02

实际上,您需要的是每个设备Id都有一个私有队列。一旦事件到达事件集线器,从它中提取事件并将其放入设备Id的私有队列中,然后依次处理它。

如何构建每个设备的队列-Id:

构建队列的简单方法是使用SQL数据库(如果每秒的请求不是很高的话,它主要是工作的),因为sql 100 req/s是normal.)

  • another水平可伸缩的方式,是使用azure附加blobs(如果您的事件处理器是无状态的)。

  • 您还可以使用高级方法,如使用Azure可靠队列.
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62106891

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档