我正在使用和Kafka流。假设我有一个处理器,它是一个将字符串的KStream转换为KStream of CityProgrammes的函数。现在的问题是,在转换过程中发生任何错误,整个应用程序将停止。我想向DLQ发送一个特定的信息,然后继续前进。我已经读了好几天了,每个人都建议在被调用的服务中处理错误,但在我看来,这是不可靠的,而且我仍然需要返回一个KStream:我如何在捕获范围内做到这一点?这听起来可能是一个a问题,所以问题被改写了:当出现异常并将无效项发送到DL
我想确认我对从一个Kafka流源读取多个处理器的效率的理解。我相信,在示例1中,如果我希望根据谓词逻辑执行两个不同的进程,那么下面的示例1是最有效的。示例1public Function<KStream<String, Notification>,KStream<String, Notification>> process1() {->.........; //different additional processing to map to En