我使用汇合REST代理调用Kafka。我正在读取一个CSV文件,从所有记录中创建一个对象(大约400万条记录),并向REST代理发送一个请求。我一直在获取一个OutOfMemory异常。
确切的例外消息是:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"
我只有一个REST代理服务器的实例,作为一个码头容器托管。环境变量设置为:
JAVA_OPTIONS=-X
需要一些建议,我已经使用scala创建了一个flink作业来消费来自Kafka的消息。但是消息是用base64编码的。我试过这段代码 val x_stream: DataStream[ObjectNode] = env
.addSource(
new FlinkKafkaConsumer010[ObjectNode](parameters.get("kafka.topic.source"),
new JsonNodeDeserializationSchema(),
kfk_props
我对Play framework是个新手。
我试图在build.sbt中添加Spark和Kafka的库,但我得到了这个错误:
of org.glassfish.hk2. and org.glassfish.hk2#hk2-locator;2.22.2: not
found and javax.validation#validation-
api;${javax.validation.version}: not found
每当我运行play run时,它都会下载jar文件,稍后会给出glassfish错误。我得到的错误是
Here are the codes:
我知道我们可以过滤我们的消费者/流媒体节目中的数据。但我在这里寻找的是一个解决方案,过滤来自Kafka broker本身的数据。
问题陈述:我有一个用例来获取卡夫卡中存在的全部数据内容以及该数据的一个子集。因此,当我创建一个使用者时,我需要传递一个过滤器查询,以便只接收过滤过的数据。
我已经尝试使用Python消费者和Py火花流程序来实现它。但我没能做到。我在下面分享了我尝试过的代码片段。
Python程序
from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = Kafk
我有一个火花结构化流scala作业,它从kafka读取json消息并将数据写入S3。我配置了一个合流模式注册中心,该模式使用type=object的json格式。现在,我可以从注册表中检索模式,但是我需要在包含kafka记录的dataframe上使用这个模式。
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(schemaName) // return type is io.confluent.kafka.schemare
我试图使用JDBCSinkConnector将数据从Kafka主题传递到Postgres。在所有操作之后,例如创建主题、创建流、创建具有配置的接收器连接器以及通过python生成数据到主题,连接日志将返回以下结果:
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you