我正在工作的代码,轮询SFTP文件夹,并处理他们一个接一个,然后移动到一个成功的文件夹,一旦它完成。我们运行这个camel实例超过1。为了通过一个实例处理文件,我们有一个DB检查,比如一旦camel实例获取文件,它就会创建一个标记记录,而任何其他实例选择该文件时,它将检查此标记记录并跳过该过程。因此,我们创建了一个自定义异常,如在处理器中可用的Markerdocument,它将处理该异常。此标记异常将在全局中处理,我们将它们标记为已处理“真”,以便文件不会被移动到/ERROR文件夹。现在的问题是,由于我们正在处理它,路由进程认为它已成功处理,并将文件夹移动到我们不想要的/success,这将由实际的记录处理camel实例处理。我们如何做到这一点呢?
onException(MarkerDocumentExistsException.class)
.process("routeExceptionProcessor")
.log("Marker document available so continue next message processing... ")
.handled(true)
.stop();
onException(Exception.class)
.process("routeExceptionProcessor")
.log("Nothing to do here, move the file to error folder... ")
.redeliveryDelay(2000)
.stop();
from(getSftpOptions())
.routeId("sftp-Route")
.throttle(Integer.parseInt(appProp.throttleCount))
.timePeriodMillis(1000*60)
.onCompletion()
.onCompleteOnly()
.process("routeCompleteProcessor")
.log("Route processing is Completed : ${header.CamelFileNameOnly}")
.end()
.log("Starting the file process: ${header.CamelFileNameOnly}")
.process("routeCBCheckProcessor")
.unmarshal().gzipDeflater()
.split(body().tokenizeXML("example", "*"),new SimpleStringAggregator()).streaming()
.parallelProcessing(true)
.process("lineProcessor")
.end()
.log("aggregate is completed :${header.CamelFileNameOnly}")
发布于 2020-07-03 05:35:28
那么,您开始构建一个具有幂等存储库的幂等文件消费者。
您是否尝试过使用built-in idempotent file consumer of Camel (包含可选存储库)?
我认为这也适用于SFTP,因为该组件具有idempotent
选项,并且SFTP组件是文件组件的扩展。
https://stackoverflow.com/questions/62690577
复制相似问题