前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详述 Kafka 基本原理

详述 Kafka 基本原理

作者头像
CG国斌
发布2019-05-29 11:48:26
1.2K0
发布2019-05-29 11:48:26
举报
文章被收录于专栏:维C果糖维C果糖

文章目录

  • 1 简介
  • 2 Kafka 架构
  • 3 Kafka 存储策略
  • 4 Kafka 删除策略
  • 5 Kafka broker
  • 6 Kafka 官方文档
  • 7 代码示例
kafaka
kafaka

1 简介

Apache Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

2 Kafka 架构

它的架构包括以下组件:

  • 话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。
  • 生产者(Producer):是能够发布消息到话题的任何对象。
  • 服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或 Kafka 集群。
  • 消费者(Consumer):可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
2
2

3 Kafka 存储策略

  • Kafka 以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。
  • 每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
  • 每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。
  • 发布者发到某topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment
celue
celue

4 Kafka 删除策略

  • N天前的删除。
  • 保留最近的MGB数据。

5 Kafka broker

与其它消息系统不同,Kafka broker是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,broker完全不管(由offset managerbroker管理)。

从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka 创新性地解决了这个问题,它将一个简单的基于时间的 SLA 应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。

这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。

6 Kafka 官方文档

Kafka Design

  • 目标
    • 高吞吐量来支持高容量的事件流处理
    • 支持从离线系统加载数据
    • 低延迟的消息系统
  • 持久化
    • 依赖文件系统,持久化到本地
    • 数据持久化到log
  • 效率
    • 解决small IO problem
      • 使用message set组合消息。
      • server使用chunks of messages写到log
      • consumer一次获取大的消息块。
    • 解决byte copying
      • producerbrokerconsumer之间使用统一的binary message format
      • 使用系统pagecache
      • 使用sendfile传输log,避免拷贝

端到端的批量压缩(End-to-end Batch Compression),Kafka 支持 GZIP 和 Snappy 压缩协议。

The Producer

  • 负载均衡
    • producer可以自定义发送到哪个partition的路由规则。默认路由规则:hash(key) % numPartitions,如果keynull则随机选择一个partition
    • 自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consume就可以从同一个partition读取同一个user的消息。
  • 异步批量发送
    • 批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。

The Consumer

consumer控制消息的读取。

Push vs Pull

  • producer推(push)数据到brokerconsumerbroker拉(pull)数据
  • consumer拉的优点:consumer自己控制消息的读取速度和数量
  • consumer拉的缺点:如果broker没有数据,则可能要pull多次忙等待,Kafka 可以配置consumer long pull一直等到有数据

Consumer Position

  • 大部分消息系统由broker记录哪些消息被消费了,但 Kafka 不是
  • Kafka 由consumer控制消息的消费,consumer甚至可以回到一个old offset的位置再次消费消息

Message Delivery Semantics

  • 至多一次(At most once ),消息可能丢失,但不会重复
  • 至少一次(At least once),消息不会丢失,但可能重复
  • 恰好一次(Exactly once),这正是我们想要的,消息仅被发送一次

Producer:有个acks配置可以控制接收的leader的在什么情况下就回应producer消息写入成功。

Consumer

  • 读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息,对应At most once
  • 读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次,对应At least once
  • 读取消息,同时处理消息并把resultlog同时写入,这样保证resultlog同时更新或同时失败,对应Exactly once

Kafka 默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,Kafka 提供了读取offset,实现也没有问题。

复制(Replication)

  • 一个partition的复制个数(replication factor)包括这个partitionleader本身。
  • 所有对partition的读和写都通过leader
  • Followers通过pull获取leaderlogmessageoffset
  • 如果一个follower挂掉、卡住或者同步太慢,leader会把这个followerin sync replicas(ISR)列表中删除。
  • 当所有的in sync replicasfollower把一个消息写入到自己的log中时,这个消息才被认为是committed的。
  • 如果针对某个partition的所有复制节点都挂了,Kafka 选择最先复活的那个节点作为leader(这个节点不一定在ISR里)。

日志压缩(Log Compaction)

  • 针对一个topicpartition,压缩使得 Kafka 至少知道每个key对应的最后一个值。
  • 压缩不会重排序消息。
  • 消息的offset是不会变的。
  • 消息的offset是顺序的。

Distribution

  • Consumer Offset Tracking
  • High-level consumer 记录每个 partition 所消费的 maximum offset,并定期 commit 到 offset manager(broker)。
  • Simple consumer 需要手动管理 offset。现在的 Simple consumer Java API 只支持 commit offset 到 zookeeper。
  • Consumers and Consumer Groups
  • consumer 注册到 zookeeper
  • 属于同一个 group 的 consumer(group id 一样)平均分配 partition,每个 partition 只会被一个 consumer 消费。
  • 当 broker 或同一个 group 的其他 consumer 的状态发生变化的时候,consumer rebalance 就会发生。

Zookeeper 协调控制

  • 管理brokerconsumer的动态加入与离开。
  • 触发负载均衡,当brokerconsumer加入或离开时会触发负载均衡算法,使得一个 consumer组内的多个consumer的订阅负载平衡。
  • 维护消费关系及每个partition的消费信息。

7 代码示例

生产者代码示例:

import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

Partitioning Code:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  } 
}

消费者代码示例:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }
 
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
    }
}

ConsumerTest:

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
 
public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;
 
    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }
 
    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

转载声明:本文转自博客园「阿凡卢」,Kafka基本原理

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年07月24日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 1 简介
  • 2 Kafka 架构
  • 3 Kafka 存储策略
  • 4 Kafka 删除策略
  • 5 Kafka broker
  • 6 Kafka 官方文档
  • 7 代码示例
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档