Apache Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
它的架构包括以下组件:
Topic
):是特定类型的消息流。消息是字节的有效负载(Payload
),话题是消息的分类名或种子(Feed
)名。Producer
):是能够发布消息到话题的任何对象。Broker
):已发布的消息保存在一组服务器中,它们被称为代理(Broker
)或 Kafka 集群。Consumer
):可以订阅一个或多个话题,并从Broker
拉数据,从而消费这些已发布的消息。topic
来进行消息管理,每个topic
包含多个partition
,每个partition
对应一个逻辑log
,有多个segment
组成。segment
中存储多条消息(见下图),消息id
由其逻辑位置决定,即从消息id
可直接定位到消息的存储位置,避免id
到位置的额外映射。part
在内存中对应一个index
,记录每个segment
中的第一条消息偏移。topic
的消息会被均匀的分布到多个partition
上(或根据用户指定的路由规则进行分布),broker
收到发布消息往对应partition
的最后一个segment
上添加该消息,当某个segment
上的消息条数达到配置值或消息发布时间超过阈值时,segment
上的消息会被flush
到磁盘,只有flush
到磁盘上的消息订阅者才能订阅到,segment
达到一定的大小后将不会再往该segment
写数据,broker
会创建新的segment
。N
天前的删除。MGB
数据。与其它消息系统不同,Kafka broker
是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,broker
完全不管(由offset managerbroker
管理)。
从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka 创新性地解决了这个问题,它将一个简单的基于时间的 SLA 应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。
这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。
Kafka Design
log
small IO problem
: message set
组合消息。server
使用chunks of messages
写到log
consumer
一次获取大的消息块。byte copying
: producer
、broker
和consumer
之间使用统一的binary message format
pagecache
sendfile
传输log
,避免拷贝端到端的批量压缩(End-to-end Batch Compression
),Kafka 支持 GZIP 和 Snappy 压缩协议。
The Producer
producer
可以自定义发送到哪个partition
的路由规则。默认路由规则:hash(key) % numPartitions
,如果key
为null
则随机选择一个partition
。key
是一个user id
,可以把同一个user
的消息发送到同一个partition
,这时consume
就可以从同一个partition
读取同一个user
的消息。The Consumer
consumer
控制消息的读取。
Push vs Pull
:
producer
推(push
)数据到broker
,consumer
从broker
拉(pull
)数据consumer
拉的优点:consumer
自己控制消息的读取速度和数量consumer
拉的缺点:如果broker
没有数据,则可能要pull
多次忙等待,Kafka 可以配置consumer long pull
一直等到有数据Consumer Position
:
broker
记录哪些消息被消费了,但 Kafka 不是consumer
控制消息的消费,consumer
甚至可以回到一个old offset
的位置再次消费消息Message Delivery Semantics
:
At most once
),消息可能丢失,但不会重复At least once
),消息不会丢失,但可能重复Exactly once
),这正是我们想要的,消息仅被发送一次Producer
:有个acks
配置可以控制接收的leader
的在什么情况下就回应producer
消息写入成功。
Consumer
:
log
,处理消息。如果处理消息失败,log
已经写入,则无法再次处理失败的消息,对应At most once
。log
。如果消息处理成功,写log
失败,则消息会被处理两次,对应At least once
。result
和log
同时写入,这样保证result
和log
同时更新或同时失败,对应Exactly once
。Kafka 默认保证at-least-once delivery
,容许用户实现at-most-once
语义,exactly-once
的实现取决于目的存储系统,Kafka 提供了读取offset
,实现也没有问题。
复制(Replication)
partition
的复制个数(replication factor
)包括这个partition
的leader
本身。partition
的读和写都通过leader
。Followers
通过pull
获取leader
上log
(message
和offset
)follower
挂掉、卡住或者同步太慢,leader
会把这个follower
从in sync replicas(ISR)
列表中删除。in sync replicas
的follower
把一个消息写入到自己的log
中时,这个消息才被认为是committed
的。partition
的所有复制节点都挂了,Kafka 选择最先复活的那个节点作为leader
(这个节点不一定在ISR
里)。日志压缩(Log Compaction)
topic
的partition
,压缩使得 Kafka 至少知道每个key
对应的最后一个值。offset
是不会变的。offset
是顺序的。Distribution
Zookeeper 协调控制
broker
与consumer
的动态加入与离开。broker
或consumer
加入或离开时会触发负载均衡算法,使得一个 consumer
组内的多个consumer
的订阅负载平衡。partition
的消费信息。生产者代码示例:
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
Partitioning Code:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
消费者代码示例:
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
example.shutdown();
}
}
ConsumerTest:
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
转载声明:本文转自博客园「阿凡卢」,Kafka基本原理。