我用java产生数据,然后把它存入Kafka topic,然后我想把这些数据存入MongoDB。当我通过JAVA以JSON的形式发送数据时,由于这个错误,它不会存储到MongoDB中。
[2020-08-15 18:42:19,164] ERROR WorkerSinkTask{id=Kafka_ops-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JSON reader was e
我是Kafka的新手,正在尝试建立一个管道,将我的apache httpd日志连接到mongodb。 我有用Kafka Output从Filebeat产生的数据。然后,我使用Kstreams从主题中读取数据,并将数据mapValues到不同的主题。然后使用Kafka Connect to a database (MongoDB)将数据下沉。不幸的是,我的Filebeat中的数据没有ID。 我如何为它们创建ID,因为我希望在将文档沉入mongodb之前创建一个唯一的ID并将其插入文档中?我希望这可以在mapValues转换中实现;
我试图使用Kafka连接MongoDB连接器向MongoDB编写一些文档。我成功地设置了所需的所有组件并启动了连接器,但是当我使用Kafka -avro-控制台-生产者向Kafka发送消息时,Kafka connector给了我以下错误:
org.apache.kafka.connect.errors.DataException: Error: `operationType` field is doc is missing.
我尝试将这个字段添加到消息中,但是kafka要求我包含一个documentKey字段。看起来,除了在我的模式中定义的有效负载之外,我还需要包括一些额外的字段,但是我找不到
我使用Apache Camel来使用kafka主题中的消息,然后处理消息,同时处理发生异常时,我将该消息重定向到另一个kafka主题,并在单独的路径中处理该消息。所以我有一条类似于下面的路线。
from ("kafka1").process("someProcessor").end();
onException(Throwable.class).process(exchange->{exchange.getIn().setBody("Message with error details")}).to("kafka2");
上
我试图运行一个管道,使用apache-束,源作为一个卡夫卡主题,目的地作为另一个卡夫卡主题。我已经编写了我的代码,并且运行良好(也就是说,在我认为的代码中没有错误)。但在输出主题中看不到数据--这是代码:
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, W
我正在我的Windows机器上配置一个Kafka Mongodb接收器连接器。
我的connect-standalone.properties文件有
plugin.path=E:/Tools/kafka_2.12-2.4.0/plugins
我的MongoSinkConnector.properties文件
name=mongo-sink
topics=first_topic
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true
# Specific global Mo