首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何设置EventProcessorHost从现在开始读取事件(协调时)?

如何设置EventProcessorHost从现在开始读取事件(协调时)?
EN

Stack Overflow用户
提问于 2015-11-24 05:26:49
回答 3查看 5K关注 0票数 5

我们使用EventProcessorHost来接收来自Azure EventHubs的事件。我一直试图(通过EventProcessorOptions.InitialOffsetProvider)将其配置为从UTC读取事件,但它总是从提要的开头读取。我没有保存检查点(我甚至删除了创建的BLOB容器)。我是这样设置的:

代码语言:javascript
复制
DateTime startDate = DateTime.UtcNow;

var epo = new EventProcessorOptions
            {
                MaxBatchSize = 100, 
                PrefetchCount = 100, 
                ReceiveTimeOut = TimeSpan.FromSeconds(120),  
                InitialOffsetProvider = (name) => startDate  
            };

任何指导都将不胜感激。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2015-11-25 11:33:22

我发现blob中的检查点文件夹仍然在那里,我的应用程序正在考虑这一点,并忽略了我在EventProcessorOptions中设置的日期。删除容器后,它开始按预期运行(计入UTC日期)。

票数 7
EN

Stack Overflow用户

发布于 2018-03-29 05:23:44

我认为这在2.0.0版本中发生了变化-Rajiv的代码现在应该是:

代码语言:javascript
复制
var eventProcessorOptions = new EventProcessorOptions
{
    InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};

下面是一个包含完全限定类名的示例块:

代码语言:javascript
复制
    private static async Task MainAsync(string[] args)
    {
        try{
            Console.WriteLine("Registering EventProcessor...");

            string AISEhConnectionStringEndpoint = Configuration["AISEhConnectionStringEndpoint"];
            string AISEhConnectionStringSharedAccessKeyName = Configuration["AISEhConnectionStringSharedAccessKeyName"];
            string AISEhConnectionStringSharedAccessKey = Configuration["AISEhConnectionStringSharedAccessKey"];
            string EhConnectionString = $"Endpoint={AISEhConnectionStringEndpoint};SharedAccessKeyName={AISEhConnectionStringSharedAccessKeyName};SharedAccessKey={AISEhConnectionStringSharedAccessKey}";
            string AISEhEntityPath = Configuration["AISEhEntityPath"];
            string AISEhConsumerGroupName = Configuration["AISEhConsumerGroupName"];
            string AISStorageContainerName = Configuration["AISStorageContainerName"];
            string AISStorageAccountName = Configuration["AISStorageAccountName"];
            string AISStorageAccountKey = Configuration["AISStorageAccountKey"];

            string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", AISStorageAccountName, AISStorageAccountKey);

            var eventProcessorHost = new Microsoft.Azure.EventHubs.Processor.EventProcessorHost(
                AISEhEntityPath,
                AISEhConsumerGroupName,
                EhConnectionString,
                StorageConnectionString,
                AISStorageContainerName);

            var options = new Microsoft.Azure.EventHubs.Processor.EventProcessorOptions
            {
                InitialOffsetProvider = (partitionId) => Microsoft.Azure.EventHubs.EventPosition.FromEnqueuedTime(DateTime.UtcNow)
            };

            // Registers the Event Processor Host and starts receiving messages
            await eventProcessorHost.RegisterEventProcessorAsync<GetEvents>(options);

            Thread.Sleep(Timeout.Infinite);

            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
            NLog.LogManager.GetCurrentClassLogger().Error(ex);
            throw;
        }
    }

}

以下是我的常规设置,为了帮助解决问题,我隐藏了秘密/确切地址,因为我发现解决这个问题不如拔牙那么令人愉悦:

代码语言:javascript
复制
"AISEhConnectionStringEndpoint": "sb://<my bus address>.servicebus.windows.net/",
"AISEhConnectionStringSharedAccessKeyName": "<my key name>",
"AISEhConnectionStringSharedAccessKey": "<yeah nah>",
"AISEhEntityPath": "<Event Hub entity path>",
"AISEhConsumerGroupName":  "<consumer group name e.g $Default>",
"AISStorageContainerName":  "<storage container name>",
"AISStorageAccountName": "<storage account name>",
"AISStorageAccountKey": "<yeah nah>",
票数 8
EN

Stack Overflow用户

发布于 2016-02-03 08:36:57

为此,您可以使用EventProcessorOptions类,并提供所需时间的偏移量。

代码语言:javascript
复制
var eventProcessorOptions = new EventProcessorOptions
{
    InitialOffsetProvider = (partitionId) => DateTime.UtcNow
};

然后,您可以使用任何接受eventProcessorOptionsRegisterEventProcessAsync重载。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33881020

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档