前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式专题|想进入大厂,你得会点kafka

分布式专题|想进入大厂,你得会点kafka

作者头像
AI码师
发布2020-12-11 14:28:45
5950
发布2020-12-11 14:28:45
举报

点击上方蓝字关注我们 文末有惊喜

介绍

什么是kafka?

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目

kafka能用在哪里?

  • 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

kafka基本组件

  • Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
  • Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
  • Producer 消息生产者,向Broker发送消息的客户端
  • Consumer 消息消费者,从Broker读取消息的客户端
  • ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
  • Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的,每个partition又能支持分配多个副本,在多个副本所在的broker中,会选举出一个leader,leader分区负责处理读写请求,并且把数据同步给其他的follower中保存的副本。

kafka 难点理解

  • 消息消费之后不会删除 消息被消费者消费之后,还会保存在分区里面,什么时候会被删除呢?是通过 配置参数 「log.retention.hours」决定的,如果这里设置成10,那么10小时之后,这个消息就会被删除
  • topic、partition和Broker之间的关系 一个topic代表一个业务数据集,例如如果需要处理订单数据,可以为订单消息创建一个topic,所有的订单消息都会发到这个topic中; 如果订单消息越来越多,那么就会造成这个topic变得越来越大,有可能会达到TB,肯定不能放在单机上面保存,我们可以对这个topic进行分区(partition),这些分区会分散在不同的机器上面,划分多个分区,也是为了提高消息的并发消费,因为前面说过,一个分区只能被每个消费组中的一个消费者进行消费,如果拆分成多个分区,就可以同时被多个消费者进行消费; broker最容易理解了:运行kafka进程的机器就是一个broker;
  • kafka如何支持传统消息的两种模式:队列和订阅 这两种模式都是基于kafka的消费机制决定的:生产者发送的消息会发到所有订阅了该topic的消费组(consumer grop)中,但是每个消费组中只有一个消费者能够消费到这条消息。
  • 队列模式:所有消费者位于同一个消费组,保证消息只会被一个消费者进行消费
  • 发布\订阅模式:将消费者放在不同消费组中,这样每个消费者都能收到同一个消息
  • kafka如何保证消息顺序消费 kafka通过保证一个分区的消息只能被消费组中的一个消费者进行消费,所以生产者发送消息必须将消息发送到同一个分区中,才能保证消息顺序消费;

如何在docker上安装kafka

安装kafka的前提是你要安装zookeeper

  • 安装zookeeper
代码语言:javascript
复制
# 创建文件夹
mkdir -p ~/docker/zookeeper/conf
mkdir -p ~/docker/zookeeper/data
mkdir -p ~/docker/zookeeper/datalog
docker run -d --name zookeeper \
--restart always \                        # docker服务启动时, 默认启动zookeeper容器
-p 2181:2181 -p 2888:2888 -p 3888:3888 \
-v ~/docker/zookeeper/conf:/conf \
-v ~/docker/zookeeper/data:/data \
-v ~/docker/zookeeper/datalog:/datalog \
zookeeper:3.4.14
  • 安装kafka
代码语言:javascript
复制
 docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -v ~/docker/kafka/logs:/kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper --link zookeeper:zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${IP}:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
  • 使用kafka自带的控制台生产者和消费者 进行测试
代码语言:javascript
复制

# 开启生产者
docker exec -it kafka bash
# 创建主题
kafka-topics.sh --create -zookeeper zookeeper --topic lezai --partitions 3 -replication-factor 1
# 生产者连接kafka
kafka-console-producer.sh --topic lezai -bootstrap-server 127.0.0.1:9092

# 开启消费者
docker exec -it kafka bash
# 消费者连接kafka
kafka-console-consumer.sh --topic lezai -bootstrap-server 127.0.0.1:9092 --from-beginning

# 现在在生产者窗口输入内容,看看消费者窗口是否能收到
  • kafka命令行常规操作
代码语言:javascript
复制
1.查看topic的详细信息 
./kafka-topics.sh -bootstrap-server 127.0.0.1:2181 -describe -topic testKJ1 
  
2、为topic增加副本 
./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute 
  
3、创建topic 
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1 
  
4、为topic增加partition 
./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1 
  
5、kafka生产者客户端命令 
./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1 
  
6、kafka消费者客户端命令 
./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1 
  
7、kafka服务启动 
./kafka-server-start.sh -daemon ../config/server.properties  
  
8、下线broker 
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60 
shutdown broker 
  
9、删除topic 
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181 
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1 
  
10、查看consumer组内消费的offset 
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic testKJ1

springboot 集成kafka

  • 依赖导入
代码语言:javascript
复制
 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
 </dependency>
  • 配置文件
代码语言:javascript
复制
server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: mygroup
      enable-auto-commit: true
  • 添加消息生产者
代码语言:javascript
复制
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    @Test
    public  void send(){
        kafkaTemplate.send("lezai",0,"key","kafkasss 发送数据");
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 添加两个消费者,存在不同消费组
代码语言:javascript
复制
    @KafkaListener(topics = "lezai",groupId = "testGroup")
    public void listen(ConsumerRecord<String, String> record) {
        String value = record.value();
        System.out.println("testGroup"+value);
        System.out.println(record);
    }


    @KafkaListener(topics = "lezai",groupId = "testGroup2")
    public void listen2(ConsumerRecord<String, String> record) {
        String value = record.value();
        System.out.println("testGroup2"+value);
        System.out.println(record);
    }
    
    // 可以切换为相同的groupId,来验证消息是否会被同一个消费组中的消费者消费

扫码关注我们,了解最新内容

成长心路 | 优质书单 | 面试资料

牛人故事 | 前沿技术 | 视频教程

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 乐哉开讲 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
    • 什么是kafka?
      • kafka能用在哪里?
        • kafka基本组件
          • kafka 难点理解
          • 如何在docker上安装kafka
          • springboot 集成kafka
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档