我已经尝试过实现我自己的QueueProcessorFactory,,它工作的很好,除了一件事我不能把我的头脑。在我尝试了5次消息之后(默认),它运行CopyMessageToPoisonQueueAsync,然后运行DeleteMessageAsync。
到目前为止还不错,但10分钟后,消息再次以排队列计数5出现在队列中,并且也出现在毒队列中,然后是相同的过程,CopyMessageToPoisonQueueAsync、DeleteMessageAsync,这是位置队列中的一个额外项,与已经复制的项完全相同,10分钟后出现了相同的过程,但使用了dequeue计数6。我应该在删除时更改ExpirationTime并将其设置为现在,还是删除其他内容时将其设置为?
这是我的密码:
class Program
{
static void Main()
{
var config = new JobHostConfiguration();
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(3);
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
var host = new JobHost(config);
host.RunAndBlock();
}
}
public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
public List<CustomQueueProcessor> CustomQueueProcessors = new List<CustomQueueProcessor>();
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
CustomQueueProcessor processor = new CustomQueueProcessor(context);
CustomQueueProcessors.Add(processor);
return processor;
}
public class CustomQueueProcessor : QueueProcessor
{
public CustomQueueProcessor(QueueProcessorFactoryContext context)
: base(context)
{
}
public override Task<bool> BeginProcessingMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.BeginProcessingMessageAsync(message, cancellationToken);
}
public override Task CompleteProcessingMessageAsync(CloudQueueMessage message, FunctionResult result, CancellationToken cancellationToken)
{
return base.CompleteProcessingMessageAsync(message, result, cancellationToken);
}
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
return base.CopyMessageToPoisonQueueAsync(message, poisonQueue, cancellationToken);
}
protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.DeleteMessageAsync(message, cancellationToken);
}
protected override async Task ReleaseMessageAsync(CloudQueueMessage message, FunctionResult result, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
{
visibilityTimeout = TimeSpan.FromSeconds(2);
await base.ReleaseMessageAsync(message, result, visibilityTimeout, cancellationToken);
}
}
}
如果添加一些ConsoleWritelines,就会得到以下输出:
由于重复,开始省略。。。
Microsoft.Azure.WebJobs.Host.FunctionInvocationException:消息: d3c88182-ff39-4f81-8c29-b4ce0b2062ad排队列计数:5日期: 2017-06-26 13:33:42执行' Functions.ProcessQueueMessage‘(原因=’01testQueue上检测到的新队列消息‘,Id=17405a55-6d28-6d28-48b2-a 874-718c0b741f61)在执行函数时测试QueueProcessorFactory异常: Functions.ProcessQueueMessage - System.Exception: Derp!在WebJobTest1.Functions.ProcessQueueMessage(String消息中,TextWriter日志)
...message省略了..。
CompleteProcessingMessageAsync消息: d3c88182-ff39-4f81-8c29-b4ce0b2062ad排队列计数: 5 CopyMessageToPoisonQueueAsync消息: d3c88182-ff39-4f81-8c29-b4ce0b2062ad排队列计数:5条消息已到达MaxDequeueCount of 5。将消息移动到队列‘01testQueue-CopyMessageToPoisonQueueAsync’。CopyMessageToPoisonQueueAsync消息: on 643007-954a-4296-9e9a-54ebb0aec6c5排队列计数:5分钟等待时间: BeginProcessingMessageAsync消息: d3c88182-ff39-4f81-8c29-b4ce0b2062ad排队列计数:6日期: 2017-06-26 13:43:46执行'Functions.ProcessQueueMessage‘(原因=’01testQueue上检测到的新队列消息‘,。Id=c22fe457-cb70-4cc8-a8a4-550cd44a8345)执行功能时测试QueueProcessorFactory异常: Functions.ProcessQueueMessage CompleteProcessingMessageAsync消息: d3c88182-ff39-4f81-8c29-b4ce0b2062ad排队列计数:6 CopyMessageToPoisonQueueAsync消息: d3c88182-ff39-4f81-8c29-b4ce0b2062ad脱队列计数:6条消息已到达MaxDequeueCount of 5。CopyMessageToPoisonQueueAsync消息: c43fd9fb-c2d7-4745-91ae-33cdc407ede6排队列数:6
发布于 2017-06-27 07:39:58
根据您的描述,我假设在与WebJobs SDK一起使用StorageSDK8.x时,它与已知的问题有关。以下是类似的问题:
根据我的测试,这个问题目前还没有解决。可以将Storage降级或更改CopyMessageToPoisonQueueAsync
,如下所示:
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
var newMessage = new CloudQueueMessage(message.Id, message.PopReceipt);
newMessage.SetMessageContent(message.AsBytes);
return base.CopyMessageToPoisonQueueAsync(newMessage, poisonQueue, cancellationToken);
}
https://stackoverflow.com/questions/44759133
复制相似问题