首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka elasticsearch连接器--“使用未刷新记录的刷新超时过期:”

Kafka elasticsearch连接器--“使用未刷新记录的刷新超时过期:”
EN

Stack Overflow用户
提问于 2018-02-04 22:01:41
回答 2查看 3.2K关注 0票数 4

我有一个奇怪的问题卡夫卡-> elasticsearch连接器。当我第一次开始这一切的时候,我收到了elasticsearch中的一个新数据,并通过kibana仪表板检查了它,但是当我使用同一个生产者应用程序向kafka生成了新的数据,并试图再次启动连接器时,我没有在elasticsearch中获得任何新数据。现在我收到了这样的错误:

代码语言:javascript
运行
复制
[2018-02-04 21:38:04,987] ERROR WorkerSinkTask{id=log-platform-elastic-0} Commit of offsets threw an unexpected exception for sequence number 14: null (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 15805

我使用next命令运行连接器:

代码语言:javascript
运行
复制
/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties log-platform-elastic.properties

connect-avro-standalone.properties

代码语言:javascript
运行
复制
bootstrap.servers=kafka-0.kafka-hs:9093,kafka-1.kafka-hs:9093,kafka-2.kafka-hs:9093
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
#rest.host.name=
rest.port=8084
#rest.advertised.host.name=
#rest.advertised.port=
plugin.path=/usr/share/java

log-platform-elastic.properties

代码语言:javascript
运行
复制
name=log-platform-elastic
key.converter=org.apache.kafka.connect.storage.StringConverter
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=member_sync_log, order_history_sync_log # ... and many others
key.ignore=true
connection.url=http://elasticsearch:9200
type.name=log

我检查了到kafka代理、elasticsearch和schema-注册表(模式-注册表和连接器在同一主机上)的连接,一切都很好。Kafka经纪人在端口9093上运行,我可以使用kafka-avro-控制台-使用者从主题中读取数据。我很感激你在这方面的任何帮助!

EN

回答 2

Stack Overflow用户

发布于 2019-07-22 16:29:40

只需更新flush.timeout.ms到大于10000 (10秒,这是默认的)

根据文件:

flush.timeout.ms用于定期刷新的超时(毫秒),以及在等待已完成的请求在添加记录时可用的缓冲区空间时使用的超时。如果超过此超时,则任务将失败。 类型:长默认值: 10000重要性:低

见文件

票数 2
EN

Stack Overflow用户

发布于 2020-06-26 19:39:58

我们可以优化弹性搜索配置来解决问题。有关配置参数,请参阅下面的链接

options.html

以下是能够控制消息速率流以最终帮助解决问题的关键参数:

flush.timeout.ms:增加可能有助于在同花顺时间给更多的呼吸

用于定期刷新的超时(毫秒),以及等待已完成的请求在添加记录时可用的缓冲区空间时使用的超时。如果超过此超时,则任务将失败。

max.buffered.records:尝试降低缓冲区记录限制

在阻止接受更多记录之前,每个任务将缓冲的最大记录数。此配置可用于限制每个任务的内存使用量。

batch.size:试着减少批次大小

写入Elasticsearch时要作为批处理处理的记录数。

tasks.max:减少或增加并行线程(使用者实例)的数量。这将控制弹性搜索,如果带宽不能处理减少任务可能有帮助。

它通过调整上述参数来解决我的问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48613433

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档