卡夫卡消费者继续打印错误信息。
我使用5台机器(kafka版本2.3.0)构建了一个集群( kafka version 2.3.0),它的分区为0,数据副本为3。当我使用kafka-客户端api时,我继续输出异常:在分区测试中,偏移提交失败-0,偏移量1:请求超时。但是这个主题读和写消息是可以的。
消费者配置:
Auto.commit.interval.ms = 5000
Auto.offset.reset = latest
Bootstrap.servers = [qs-kfk-01:9092, qs-kfk-02:9092, qs-kfk-03:9092, qs-kfk-04:9092, qs-kfk-05:9092]
Check.crcs = true
Client.id =
Connections.max.idle.ms = 540000
Default.api.timeout.ms = 60000
Enable.auto.commit = true
Exclude.internal.topics = true
Fetch.max.bytes = 52428800
Fetch.max.wait.ms = 500
Fetch.min.bytes = 1
Group.id = erp-sales
Heartbeat.interval.ms = 3000
Interceptor.classes = []
Internal.leave.group.on.close = true
Isolation.level = read_uncommitted
Key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Max.partition.fetch.bytes = 1048576
Max.poll.interval.ms = 300000
Max.poll.records = 500
Metadata.max.age.ms = 300000
Metric.reporters = []
Metrics.num.samples = 2
Metrics.recording.level = INFO
Metrics.sample.window.ms = 30000
Partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
Receive.buffer.bytes = 65536
Reconnect.backoff.max.ms = 1000
Reconnect.backoff.ms = 50
Request.timeout.ms = 30000
Retry.backoff.ms = 100
Sasl.client.callback.handler.class = null
Sasl.jaas.config = null
Sasl.kerberos.kinit.cmd = /usr/bin/kinit
Sasl.kerberos.min.time.before.relogin = 60000
Sasl.kerberos.service.name = null
Sasl.kerberos.ticket.renew.jitter = 0.05
Sasl.kerberos.ticket.renew.window.factor = 0.8
Sasl.login.callback.handler.class = null
Sasl.login.class = null
Sasl.login.refresh.buffer.seconds = 300
Sasl.login.refresh.min.period.seconds = 60
Sasl.login.refresh.window.factor = 0.8
Sasl.login.refresh.window.jitter = 0.05
Sasl.mechanism = GSSAPI
Security.protocol = PLAINTEXT
Send.buffer.bytes = 131072
Session.timeout.ms = 10000
Ssl.cipher.suites = null
Ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
Ssl.endpoint.identification.algorithm = https
Ssl.key.password = null
Ssl.keymanager.algorithm = SunX509
Ssl.keystore.location = null
Ssl.keystore.password = null
Ssl.keystore.type = JKS
Ssl.protocol = TLS
Ssl.provider = null
Ssl.secure.random.implementation = null
Ssl.trustmanager.algorithm = PKIX
Ssl.truststore.location = null
Ssl.truststore.password = null
Ssl.truststore.type = JKS
Value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
java代码:
ConsumerRecords<K, V> consumerRecords = _kafkaConsumer.poll(50L);
for (ConsumerRecord<K, V> record : consumerRecords.records(topic)) {
kafkaConsumer.receive(topic, record.key(), record.value(), record.partition(), record.offset());
}
我尝试了以下几点:
我在控制台中得到连续的错误消息输出。
2019-11-03 16:21:11.687 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=erp-sales] Sending asynchronous auto-commit of offsets {test-0=OffsetAndMetadata{offset=1, metadata=''}}
2019-11-03 16:21:11.704 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=erp-sales] Offset commit failed on partition test-0 at offset 1: The request timed out.
2019-11-03 16:21:11.704 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=erp-sales] Group coordinator qs-kfk-04:9092 (id: 2147483643 rack: null) is unavailable or invalid, will attempt rediscovery
2019-11-03 16:21:11.705 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=erp-sales] Asynchronous auto-commit of offsets {test-0=OffsetAndMetadata{offset=1, metadata=''}} failed due to retriable error: {}
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.
2019-11-03 16:21:11.708 DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=erp-sales] Manually disconnected from 2147483643. Removed requests: OFFSET_COMMIT.
2019-11-03 16:21:11.708 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - [Consumer clientId=consumer-1, groupId=erp-sales] Cancelled request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=4, clientId=consumer-1, correlationId=42) due to node 2147483643 being disconnected
2019-11-03 16:21:11.708 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=erp-sales] Asynchronous auto-commit of offsets {test-0=OffsetAndMetadata{offset=1, metadata=''}} failed due to retriable error: {}
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException: null
谢谢你的帮助。
发布于 2019-11-04 06:19:12
发布于 2019-11-05 07:03:02
@Rohit,谢谢你的回答,我现在已经取代了一个消费群体,我现在工作得很好。但是现在客户端有另一个问题,持续输出:
2019-11-05 14:11:14.892 INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=fs-sales-be] Error sending fetch request (sessionId=2035993355, epoch=1) to node 4: org.apache.kafka.common.errors.DisconnectException.
2019-11-05 14:11:14.892 INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=fs-sales-be] Error sending fetch request (sessionId=181175071, epoch=INITIAL) to node 5: org.apache.kafka.common.errors.DisconnectException.
这是什么原因造成的? 4,5节点状态是可以的。
https://stackoverflow.com/questions/58678704
复制相似问题