前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

Kafka

作者头像
挽风
发布2022-11-28 19:09:46
4450
发布2022-11-28 19:09:46
举报
文章被收录于专栏:小道小道

1 Kafka架构

  生产者、Broker、消费者、Zookeeper;

注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息。

1.1 Producer发送数据流程

在这里插入图片描述
在这里插入图片描述

1.2 Kafka Broker总体工作流程

在这里插入图片描述
在这里插入图片描述

1.3 消费者组初始化流程

在这里插入图片描述
在这里插入图片描述

2 Kafka的机器数量

   Kafka机器数量 = 2 *(峰值生产速度 * 副本数 / 100)+ 1

3 副本数设定

  一般设置成2个或3个,很多企业设置为2个。

  副本的优势:提高可靠性;副本劣势:增加了网络IO传输

4 Kafka压测

  Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

5 Kafka日志保存时间

  默认保存7天;生产环境建议3天

6 Kafka中数据量计算

  每天总数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条/每秒钟   平均每秒钟:1150条   低谷每秒钟:50条   高峰每秒钟:1150条 *(2-20倍)= 2300条 - 23000条   每条日志大小:0.5k - 2k(取1k)   每秒多少数据量:2.0M - 20MB

7 Kafka数据存储需要多少硬盘空间

  每天的数据量(100g) * 副本数(2个副本) * 日志保存时长(3天) / 70%

8 Kafka监控

  自己开发监控器;

  开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle

9 Kakfa分区数计算

分区数一般设置为:3-10个

  1)创建一个只有1个分区的topic

  2)测试这个topic的producer吞吐量和consumer吞吐量。

  3)假设他们的值分别是Tp和Tc,单位可以是MB/s。

  4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)

    例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;则分区数 = 100 / 20 = 5个分区

10 多少个Topic

  通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。

11 Kafka的ISR副本同步队列

  ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列

  任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。

12 Kafka分区分配策略

  在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。

  Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

  例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。

    C1-0 将消费 0, 1, 2, 3 分区     C2-0 将消费 4, 5, 6 分区     C2-1 将消费 7, 8, 9 分区

  RoundRobin方式第一步将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。

13 Kafka挂掉

  1)Flume记录

  2)日志有记录

  3)短期没事

14 Kafka数据丢失问题

14.1 producer角度

  Ack = 0,相当于异步发送,消息发送完毕即offset增加,继续生产。

  Ack = 1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。

  Ack = -1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。

  ack在生产者指定,不同生产者可以不同。

ack设为-1,需要ISR里的所有follower应答,想要真正不丢数据,需要配合参数:

min.insync.replicas: n (ack为-1时生效,ISR里应答的最小follower数量)

  默认为1(leader本身也算一个!),所以当ISR里除了leader本身,没有其他的follower,即使ack设为-1,相当于1的效果,不能保证不丢数据。

  需要将min.insync.replicas设置大于等于2,才能保证有其他副本同步到数据。

retries = Integer.MAX_VALUE,无限重试。

  如果上述两个条件不满足,写入一直失败,就会无限次重试,保证数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失

代码语言:javascript
复制
kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic testisr2 
				--replication-factor 3 --partitions 4 --config min.insync.replicas=2

完全不丢结论:ack=-1 + min.insync.replicas>=2 +无限重试

14.2 broker角度

  副本数大于1

  min.insync.replicas大于1

14.3 consumer角度

  手动提交offset

  flink结合checkpoint

15 Kafka数据重复

  重复指的是发生重试造成的重复。

幂等性 + ack-1 + 事务

15.1 Kafka数据重复

  可以在下一级:SparkStreaming、redis、Flink或者Hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;

  了解:

  Kafka幂等性原理(单分区单会话):producer重试引起的乱序和重复

15.2 重复问题的解决:

  1)Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic的partition也会维护pid-seq的映射,并且每Commit都会更新lastSeq。

  2)recordBatch到来时,broker会先检查RecordBatch再保存数据:

    如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存。

15.3 乱序问题的解决

  假设我们有5个请求,batch1、batch2、batch3、batch4、batch5;

  如果只有batch2 ack failed,3、4、5都保存了,那2将会随下次batch重发而造成乱序。

  可以设置max.in.flight.requests.per.connection=1(客户端在单个连接上能够发送的未响应请求的个数)来解决乱序,但降低了系统吞吐。

  新版本kafka设置enable.idempotence=true后能够动态调整max-in-flight-request。

  正常情况下max.in.flight.requests.per.connection大于1。当重试请求到来时,batch 会根据 seq重新添加到队列的合适位置,并把max.in.flight.requests.per.connection设为1,这样它前面的 batch序号都比它小,只有前面的都发完了,它才能发。

16 Kafka消息数据积压,Kafka消费能力不足怎么处理?

  1 、如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)

  2 、如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-11-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Kafka架构
    • 1.1 Producer发送数据流程
      • 1.2 Kafka Broker总体工作流程
        • 1.3 消费者组初始化流程
        • 2 Kafka的机器数量
        • 3 副本数设定
        • 4 Kafka压测
        • 5 Kafka日志保存时间
        • 6 Kafka中数据量计算
        • 7 Kafka数据存储需要多少硬盘空间
        • 8 Kafka监控
        • 9 Kakfa分区数计算
        • 10 多少个Topic
        • 11 Kafka的ISR副本同步队列
        • 12 Kafka分区分配策略
        • 13 Kafka挂掉
        • 14 Kafka数据丢失问题
          • 14.1 producer角度
            • 14.2 broker角度
              • 14.3 consumer角度
              • 15 Kafka数据重复
                • 15.1 Kafka数据重复
                  • 15.2 重复问题的解决:
                    • 15.3 乱序问题的解决
                    • 16 Kafka消息数据积压,Kafka消费能力不足怎么处理?
                    相关产品与服务
                    数据保险箱
                    数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档