将tidb cdc流量打到kafka中,本地起consumer消费,但是每当生产者流量突然增大时,消费程序就停止消费,拉取不到topic数据,也不报错。
springboot+kafka+批处理+自动提交offset
消费程序如下:
@KafkaListener(topicPartitions =
{@TopicPartition(topic = "${spring.kafka.topic}",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "${offset}"))
})
public void consume(List<ConsumerRecord<String, String>> consumerRecords) {
.......
}
消费者配置如下:
spring:
kafka:
bootstrap-servers: 10.0.20.128:9092
topic: group_chat_member
consumer:
enable-auto-commit: true
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
group-id: group_consumer
max-poll-records: 100
listener:
ack-mode: batch
type: batch
properties:
max.poll.interval.ms: 1200000
receive.buffer.bytes: 1073741824
ckafka监控如下:
当生产流量激增时,消费程序会出现消费为0的情况,后续会出现波动消费,或者几个小时不消费,有什么办法可以解决这个问题?