
Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流应用。核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和 Broker。
host:port。0(无需确认)、1(Leader 确认)或 all(所有副本确认)。batch.size 配合使用。注意事项:
batch.size 和 linger.ms,但会引入延迟。acks=all 保证数据不丢失,但降低吞吐量。earliest(从最早开始)或 latest(从最新开始)。注意事项:
group.id 唯一性,避免消费混乱。注意事项:
log.dirs 有足够磁盘空间。场景:多个服务生成日志,通过 Kafka 统一收集并供下游分析系统消费。
实现步骤:
logs,设置分区数为 3,副本数为 2。logs 主题。logs 主题并处理日志。import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("logs", "key", "message");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to partition " + metadata.partition());
}
});
producer.close();import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "log-consumers");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("logs"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset = %d, Key = %s, Value = %s%n",
record.offset(), record.key(), record.value());
}
}acks=all,消费者手动提交偏移量。batch.size 和 linger.ms,增加分区数。通过合理配置参数和遵循最佳实践,可以充分发挥 Kafka 的高吞吐、低延迟特性。