全,
我使用变更源处理器Library.Want来了解处理服务故障的最佳方法,以及ProcessChangesAsync方法中的异常/错误场景。以下是我所指的事件。
1)服务失败--使处理器库在某些操作中崩溃的服务。如何从同一个文档(失败实例上的文档)启动进程?是否有任何内置的机制,其中更改提要将从上一个失败的文档开始?让我们假设,在当前批处理中,我们有10个docs.5成功处理,然后由于网络故障而中断服务,或者通过其他一些reasons.Will,一旦服务重新启动,我的进程就从第6文档开始?如何做到这一点?
2)异常和错误-- ProcessChangesAsync方法中的任何错误都可以在全局级别使用try来处理,但是如何持久化这些故障记录并将其用于下一批?同样,在更改提要过程中寻找任何可用的内置机制。
发布于 2018-08-22 19:32:26
1)默认情况下,处理器库在成功运行ProcessChangesAsync后设置检查点。在最新的图书馆版本中,您可以自定义检查指针,以便在需要时执行手动检查点。如果由于某种原因,处理器在检查点之前关闭,那么它将从存储在租约集合中的最后一个成功检查点开始下一步处理。在您的示例中,它将再次从第一个文档开始,因此您将不会丢失一个更改,但是您可能会经历双重处理(这是一个“至少一次”模型)。
2)没有可以利用的内置机制,处理ProcessChangesAsync中的异常是您的责任。您不仅可以添加全局try/catch,而且在遍历文档的情况下,在循环中添加try/catch,以处理失败的文档(可能会将其发送到队列等待以后的分析/后处理),而不会丢失批处理。如果您需要对这些错误进行日志记录(我假设这就是您所说的持久化错误?),那么最新版本与LibLog兼容,因此插入您自己的自定义日志记录非常简单:
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;
var hostName = "SampleHost";
var tracelogProvider = new TraceLogProvider(); //You can use any provider supported by LibLog
using (tracelogProvider.OpenNestedContext(hostName))
{
LogProvider.SetCurrentLogProvider(tracelogProvider);
// After this, create IChangeFeedProcessor instance and start/stop it.
}注释的附加信息
为了避免中断批处理或导致批处理重新处理的异常,您可以进行如下处理:
public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
{
try
{
foreach(var document in documents)
{
try
{
// Do your work for the document
}
catch(Exception ex)
{
// Something happened with the current document, handle it, send it to a queue / another storage to analyze, log it. This catch will make the loop continue with the next.
}
}
}
catch(Exception ex)
{
// Something unhandled happened, log it and avoid throwing it again so the next batch is processed
}
}https://stackoverflow.com/questions/51931103
复制相似问题