首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【Kafka 使用手册】

【Kafka 使用手册】

作者头像
贺公子之数据科学与艺术
发布2025-12-17 14:48:08
发布2025-12-17 14:48:08
2990
举报
Kafka 使用手册

Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流应用。核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和 Broker。

  • 生产者:将数据发布到 Kafka 主题。
  • 消费者:订阅主题并处理消息。
  • 主题:消息的逻辑分类,分为多个分区。
  • 分区:主题的物理分片,每个分区是一个有序、不可变的消息序列。
  • Broker:Kafka 服务器,负责存储和转发消息。
关键参数设置及注意事项
生产者参数
  • bootstrap.servers:Kafka 集群地址列表,格式为 host:port
  • acks:确认机制,可选 0(无需确认)、1(Leader 确认)或 all(所有副本确认)。
  • retries:发送失败后的重试次数。
  • batch.size:批量发送的字节大小,提高吞吐量。
  • linger.ms:发送延迟时间,与 batch.size 配合使用。

注意事项:

  • 高吞吐场景建议增大 batch.sizelinger.ms,但会引入延迟。
  • acks=all 保证数据不丢失,但降低吞吐量。
消费者参数
  • bootstrap.servers:同生产者。
  • group.id:消费者组 ID,相同组内的消费者共享分区。
  • auto.offset.reset:无偏移量时的策略,可选 earliest(从最早开始)或 latest(从最新开始)。
  • enable.auto.commit:是否自动提交偏移量。
  • max.poll.records:单次拉取的最大消息数。

注意事项:

  • 避免频繁提交偏移量,可能影响性能。
  • 确保 group.id 唯一性,避免消费混乱。
Broker 参数
  • log.dirs:日志存储目录。
  • num.partitions:默认主题分区数。
  • default.replication.factor:默认副本数。
  • zookeeper.connect:ZooKeeper 连接地址。

注意事项:

  • 分区数和副本数影响集群的扩展性和容错性。
  • 确保 log.dirs 有足够磁盘空间。
案例分析
案例:实时日志收集系统

场景:多个服务生成日志,通过 Kafka 统一收集并供下游分析系统消费。

实现步骤

  1. 创建主题 logs,设置分区数为 3,副本数为 2。
  2. 生产者将日志发送到 logs 主题。
  3. 消费者组订阅 logs 主题并处理日志。
代码实现
生产者示例(Java)
代码语言:javascript
复制
import org.apache.kafka.clients.producer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("logs", "key", "message");

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    } else {
        System.out.println("Message sent to partition " + metadata.partition());
    }
});

producer.close();
消费者示例(Java)
代码语言:javascript
复制
import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "log-consumers");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("logs"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Offset = %d, Key = %s, Value = %s%n", 
            record.offset(), record.key(), record.value());
    }
}
常见问题及解决方案
  • 消息丢失:确保生产者设置 acks=all,消费者手动提交偏移量。
  • 消息重复:消费者实现幂等处理逻辑。
  • 性能瓶颈:调整 batch.sizelinger.ms,增加分区数。

通过合理配置参数和遵循最佳实践,可以充分发挥 Kafka 的高吞吐、低延迟特性。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-09-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka 使用手册
  • 关键参数设置及注意事项
    • 生产者参数
    • 消费者参数
    • Broker 参数
  • 案例分析
    • 案例:实时日志收集系统
  • 代码实现
    • 生产者示例(Java)
    • 消费者示例(Java)
  • 常见问题及解决方案
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档