前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

Kafka

作者头像
matt
发布2022-10-25 15:59:55
4040
发布2022-10-25 15:59:55
举报
文章被收录于专栏:CSDN迁移CSDN迁移

Kafka

一、概述

  1. 消息队列 Kafka采用点对点模式,必须有监控队列轮询的进程在(耗资源),可以随时任意速度获取数据。 发布订阅模式:速度由消息队列推送决定,不用进程监控。
在这里插入图片描述
在这里插入图片描述

优点: (1)解耦 (2)冗余(备份) (3)扩展性 (4)灵活性、峰值处理能力 (5)可恢复性(冗余) (6)顺序保证(队列) (7)缓冲(冗余) (8)异步通信(宕机)

  1. Kafka 分布式消息队列,由LinkidIn公司开发,底层是Scala,先由Apache维护,kafka_
{scala-version}-

{kafka-version}.tgz。 Kafka对消息保存时根据Topic(保存)进行分类,发送消息者称为Producer(入口),消息接收者称为Consumer(出口),此外Kafka集群有多个Kafka实例组成,每个实例(Server)称为Broker。一个分区(Partition)维护一个偏移量 (offset)。 Kafka集群和Consumer都依赖zookeeper集群保存一些meta信息。

  1. 架构
在这里插入图片描述
在这里插入图片描述

Leader为主,Follower为备,Kafka中的Follower不处理任何请求。 消费者组的不同消费者不能同时消费同一个分区的数据。

二、集群部署

代码语言:javascript
复制
# 在解压的kafka目录下(推荐),创建一个存放日志(也存放数据)的信息
mkdir logs

# 修改server.properties
# 每一个实例的唯一辨识(int)
broker.id=0
# 是否可以删除topic
delete.topic.enable=true
# 设置日志打印的位置为创建的日志目录
log.dirs=/opt/kafka/logs
# 缓存数据的时间为7天、大小为1G
log.retention.hours=168
log.segment.bytes=1073741824
# 修改zk集群
zookeeper.connect=${id}:${port},${id}:${port},${id}:${port}

# 集群分发,修改broker.id
xsync kafka/
  1. 命令
代码语言:javascript
复制
# bin/下的命令
kafka-server-start.sh
kafka-server-stop.sh
# 对topic的操作
kafka-topics.sh
# 测试时,控制台的消费
kafka-console-consumer.sh
kafka-console-producer.sh
代码语言:javascript
复制
--topic  # 定义topic名
--replication-factor  # 定义副本数
--partitions  # 定义分区数
--daemon  # 后台启动
代码语言:javascript
复制
# kafka依赖zk,先启动zk,并确定zk状态为leader/foller
zkstart.sh
zk/bin/zkServer.sh status

# 使用kafka自带的zk,-daemon表示后台启动
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

# 指定配置文件启动kafka,阻塞进程
bin/kafka-server-start.sh config/server.properties

# 查看进程
jps -l

# 创建分区数为x(kafka/logs下的一个目录),副本数为x(不能超过集群的节点数),名称为x的topic
bin/kafka-topics.sh --create --zookeeper ${id}:${port} --partitions ${number} --replication-factor ${number} --topic ${name}
# 查看topic数量,显示为topic name
bin/kafka-topics.sh --list --zookeeper ${id}:${port}
# 删除topic,能创建同名的即删除成功
bin/kafka-topics.sh --zookeeper ${id}:${port} --delete --topic ${name}

# 发送消息,连接kafka集群(kafka默认9092),写进topic
bin/kafka-console-producer.sh --broker-list ${id}:${port} --topic ${name}
# 消费消息,连接zk集群,读取topic,不加--from-beginning表示只获取最新的
bin/kafka-console-consumer.sh --zookeeper ${id}:${port} --from-beginning --topic ${name}

# 新版本的kafka中consumer的offset存储在本地,提升效率,不交由zk保存,会报警告!bootstrap(附属于)本地kafka集群,名为__consumer_offset的topic中。
bin/kafka-console-consumer.sh --bootstrap-server ${id}:${port} --from-beginning --topic ${name}

# 查看topic的详情,Isr为选举(其中某个与宕机的Leader节点数据最相近,作为新的Leader),ReplicationFactor为副本,值为broker.id
bin/kafka-topics.sh --zookeeper ${id}:${port} --describe-topic ${name}

三、工作流程分析

三大流程:生产、存储、消费

在这里插入图片描述
在这里插入图片描述

Kafka Cluster:Broker1、Broker2、Broker3 Producer创建Topic,指定分区数和副本数

代码语言:javascript
复制
# 分区的好处:不同分区放在不同节点,实现了负载;消费者组只能消费不同分区数据,提高了并发度(topic的分区数应与一个消费者组中的消费者个数相同)
Broker1:Topic A-Partition 0-Leader、Topic A-Partition 1-Follower
                         Replication A/0                    Replication A/1
Broker2:Topic A-Partition 0-Follower、Topic A-Partition 1-Leader

Consumer Group:Consumer A、Consumer B

1. 生产过程

(1)写入方式(push)

Producer采用push模式将信息发布到Broker,每条消息都被append到patition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐量)。

(2)Partition

消息都被发送到一个topic,本质就是一个目录,而topic又由一些partition logs(分区日志,offset都从0开始,有序唯一,并不断追加)组成。

代码语言:javascript
复制
分区的原因:
(1)方便在集群中扩展,每个partition可以通过调整以适应它所在的及其,而每个topic又可以由多个partition组成,因此整个集群就可以适应任意大小的数据;
(2)可以提高并发,同一个消费者组不能读取同一个分区的数据,因此可以以partition为单位读写。

分区的原则:
(1)指定了partition,则直接使用;
(2)未指定partition但指定key,通过对key的value进行hash出一个partition;
(3)partition和key都未指定,使用轮询选出一个partition。

(3)Replication(副本)

同一个partition可能有多个replication(server.properties配置中default.replication.factor=N),没有replication,一旦broker宕机,其上所有的数据都不可被消费,同时producer也不能将数据存于其上的partition。 引入replication,在需要时在其中选举出一个leader,producer和consumer只与这个leader交互,其他的作为follower从leader中复制数据。

(4)写入流程

在这里插入图片描述
在这里插入图片描述

ACK机制:0/1/all,1表示leader,all表示leader和follower均写入信息再继续接收。图为ACK为all的机制,防止数据丢失。

2. 存储过程

(1)存储方式 物理上把topic分为一个或多个partition,每个partition物理上对应一个文件夹,存储该partition的所有消息和索引文件。 实际存储数据的文件为logs/xxxx.log文件,存在序列化。

(2)存储策略 无论消息是否被消费,kafka都会保留所有的消息,删除方式有两种: 基于时间(log.retention.hours=168)和大小(log.retention.bytes=1073741824)。

代码语言:javascript
复制
kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,删除过期文件与提高kafka性能无关。

(3)zk存储机制

在这里插入图片描述
在这里插入图片描述

Consumer存储的是偏移量(低版本kafka),Producer不在zk注册,Brokers也存储在zk。

## 3. 消费过程 Kafka提供了两套consumer API:高级Comsumer API和低级Consumer API。

(1)高级API 不能管理offset,书写简单,系统通过zk自行管理; 不能管理分区、副本等,系统自动管理(默认1分钟更新zk中保存的offset )。 可以使用group来区分对同一个topic的不同程序访问分离开俩。

(2)低级API 能够开发者控制offset,随机读取; 书写复杂,需要自行控制offset,连接分区,找到leader等。

(3)消费者组 consumer.properties中group.id=group0,设置消费者组名。启动消费者时,需要添加命令--consumer.config config/consumer.properties

四、API使用

1. Producer

Maven依赖:

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>${kafka.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>${kafka.version}</version>
</dependency>

ProducerConfig类下包含所有的配置参数,以及doc参考文档。

代码语言:javascript
复制
1.高级API:带或不带回调函数的生产者
public class Producer {
    public static void main(String[] args) {
        // 设置配置文件
        Properties props = new Properties();
        // Kafka集群
        props.put("bootstrap.servers", "localhost:9092");
        // 应答级别,all可以写出-1
        props.put("acks", "all");
        // 重试次数
        props.put("retries", 0);
        // 批量大小和提交延迟,决定发送时刻
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        // 缓存
        props.put("buffer.memory", 33554432);
        // KV序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送数据,第二个参数可以加上回调函数,重写onCompletion(RecordMetadata, exception)函数
        producer.send(new ProducerRecord<>("first", String.valueOf(123)), (metadata, exception) -> {
            if (exception == null) {
                System.out.println(metadata.partition() + "-" + metadata.offset());
            } else {
                System.out.println("ERROR:" + exception);
            }
        });

        producer.close();
    }
}


// 2.低级API:自定义分区的生产者
// 实现Partition类,重写方法partition、close、configure,配置文件需要匹配生产者
props.put("partitioner.class", "${全类名}");

注意:分区中所有偏移数据消费掉,再消费下一个分区,可能会出现消费数据的顺序和生产的顺序不同。

2. Consumer

代码语言:javascript
复制
// 1.高级API
public class Consumer {
    public static void main(String[] args) {
        // 配置信息
        Properties props = new Properties();
        // kafka集群
        props.put("bootstrap.servers", "localhost:9092");
        // 消费者组id
        props.put("group.id", "test");
        // 设置自动提交offset
        props.put("enable.auto.commit", "true");
        // 提交延迟
        props.put("auto.commit.interval.ms", "1000");
        // KV的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 指定Topic
        consumer.subscribe(Arrays.asList("first", "second"));

        // 获取数据结束,JVM自动退出
        while (true) {
        
            // 获取数据,参数为获取延迟
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            // 打印数据
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.println(record.topic() + "-" + record.partition() + "-" + record.value());
            }
        }
    }
}


// 2.低级API:读取指定topic、partition(找leader)、offset的数据
/** 
 * 主要步骤:
 * (1*)findLeader(),根据指定的分区从主题元数据中找到主副本;
 * (2)getLastOffset(),获取分区最新的消费进度;
 * (3*)run(),从主副本拉取分区的消息;
 * (4)findNewLeader(),识别主副本的变化,重试。
 **/
 // 找分区leader(元数据信息)
private BrokerEndPoint findLeader(List<String> brokers, int port, String topic, int partition) {
    for (String broker : brokers) {
        // 创建获取分区Leader的消费者对象,链接到具体某一个节点
        SimpleConsumer getLeader = new SimpleConsumer(broker, port, 1000,
                1024*4, "getLeader");

        // 创建一个主题元数据的信息请求
        TopicMetadataRequest topicMetadataRequest =
                new TopicMetadataRequest(Collections.singletonList(topic));

        // 获取返回值
        TopicMetadataResponse topicMetadataResponse = getLeader.send(topicMetadataRequest);

        // 解析元数据返回值
        List<TopicMetadata> topicsMetadata = topicMetadataResponse.topicsMetadata();
        for (TopicMetadata topicMetadata : topicsMetadata) {
            List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
            for (PartitionMetadata partitionMetadata : partitionsMetadata) {
                if (partition == partitionMetadata.partitionId()) {
                    return partitionMetadata.leader();
                }
            }
        }
    }
    return null;
}

// 获取数据
private void run(List<String> brokers, int port, String topic, int partition, long offset) {
    // 获取分区Leader
    BrokerEndPoint leader = findLeader(brokers, port, topic, partition);
    if (leader == null) {
        return;
    }
    String leaderHost = leader.host();

    // 创建获取数据的消费者
    SimpleConsumer simpleConsumer = new SimpleConsumer(leaderHost, port, 1000,
            1024 * 4, "getData");

    // 创建获取数据的对象(可以获取多个数据.addFetch())
    FetchRequest fetchRequest =
            new FetchRequestBuilder().addFetch(topic, partition, offset, 1024 * 4).build();

    // 获取返回值
    FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);

    // 解析返回值,创建获取数据的对象时可以多次.addFetch
    ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
    for (MessageAndOffset messageAndOffset : messageAndOffsets) {
        // offset可以自行保存
        long offset1 = messageAndOffset.offset();
        ByteBuffer payload = messageAndOffset.message().payload();

        // 自行反序列化
        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);

        System.out.println(offset1 + "-" + new String(bytes));
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-02-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka
  • 一、概述
  • 二、集群部署
  • 三、工作流程分析
    • 1. 生产过程
      • (1)写入方式(push)
      • (2)Partition
      • (3)Replication(副本)
      • (4)写入流程
    • 2. 存储过程
    • 四、API使用
      • 1. Producer
        • 2. Consumer
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档