Clickhouse版本21.12.3.32。我m following this PR([https://github.com/ClickHouse/ClickHouse/pull/21850](https://github.com/ClickHouse/ClickHouse/pull/21850)) to handle incorrect messages from kafka topic, but after some investigation I发现,如果单个消息包含损坏的数据,则无法解析整批接收到的消息,并可能导致数据丢失。
卡夫卡发动机表:
CREATE TABLE default
我使用Apache Camel来使用kafka主题中的消息,然后处理消息,同时处理发生异常时,我将该消息重定向到另一个kafka主题,并在单独的路径中处理该消息。所以我有一条类似于下面的路线。
from ("kafka1").process("someProcessor").end();
onException(Throwable.class).process(exchange->{exchange.getIn().setBody("Message with error details")}).to("kafka2");
上
我们有kafka流应用程序。Producer在将kafka消息发送到Kafka流媒体应用之前,会在消息中添加头部。 在Kafka流媒体应用中,我们使用AbstractProcessor和context.forward(null, Optional.of(event));将消息转发到另一个主题。 但是报头正在丢失。我希望标题是从输入消息到输出主题。 ProcessorContext接口。headers()方法返回当前输入记录的标头,但在我的例子中它是空的,尽管我发送的是带有标头的消息。 * Returns the headers of the current input record