https://zhuanlan.zhihu.com/p/76073581
答:Kafka是一个发布 - 订阅的消息队列中间件。这个消息传递应用程序是用“scala”编码的。 kafka 支持的协议是防AMQP协议,支持集群,负载均衡和动态扩容(zk), 不支持事务;
它有这么三个比较关键的能力:
发布订阅,可以当做消息队列用 记录的容错持久化 流处理
优点是: 高吞吐,低延时,可扩展
允许独立的修改或者扩展两边的处理过程,只要确保其遵循同样的约束接口。
系统的一部分组件失效时,不会影响整个系统。即使部分处理消息的线程挂掉,消息加入队列,也能在系统恢复后被处理。
用于解决生产者和消费者速度不一致的情况。
在流量激增的情况下不会导致系统奔溃
用户收到消息不想立即处理,需要的时候再进行处理。
只有一个消费者 flume
只要不删消息都在
队列主动推送:缺点推送的速度统一,但是每一个订阅者的处理速度不一
消费者主动拉取的模式:缺点需要消费者进行长轮询看有没有新消息,浪费资源
被动拉取的模式, 维护一个用户列表,消息来到,通知消费者,消费队列的两端是可以不同时在线,但是被动通知还需实时监测消费者是否在线
主题:Kafka主题是一堆或一组消息。
生产者:在Kafka,生产者发布通信以及向Kafka主题发布消息。
消费者:Kafka消费者订阅了一个主题,并且还从主题中读取和处理消息。
经纪人:在管理主题中的消息存储时,我们使用Kafka Brokers。
zookeeper :
image.png
消费者组,提高消费数据的能力。消费者组里的消费者个数和分区一致是最好。消费者数不要超过分区数。
消费者组分配的策略问题。
读取操作可以直接在Page Cache上进行,如果消费和生产速度相当,甚至不需要通过物理磁盘直接交换数据,这是Kafka高吞吐量的一个重要原因。
这么做还有一个优势,如果Kafka重启,JVM内的Cache会失效,Page Cache依然可用。
传统四次拷贝:首先通过系统调用将文件数据读入到内核态Buffer(DMA拷贝),然后应用程序将内存态Buffer数据读入到用户态Buffer(CPU拷贝),接着用户程序通过Socket发送数据时将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝),最后通过DMA拷贝将数据拷贝到NIC Buffer。伴随四次切换。
sendfile和transferTo实现零拷贝 Linux 2.4+内核通过sendfile系统调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
Java NIO的FileChannel的transferTo和transferFrom方法实现零拷贝。
即使是普通的机械磁盘,顺序访问速率也接近了内存的随机访问速率。
Kafka的每条消息都是append的,不会从中间写入和删除消息,保证了磁盘的顺序访问。
即使是顺序读写,过于频繁的大量小IO操作一样会造成磁盘的瓶颈,此时又变成了随机读写。Kafka的策略是把消息集合在一起,批量发送,尽可能减少对磁盘的访问。所以,Kafka的Topic和Partition数量不宜过多。
超过64个Topic/Partition以后,Kafka性能会急剧下降。
RocketMQ存储模型与Kafka有些差异,RocketMQ会把所有的数据存放在相同的日志文件,所以单机可以支持非常多的队列。
Kafka基本上是没有阻塞操作的,调用发送方法会立即返回,等待buffer满了以后交给轮询线程,发送和接收消息,复制数据也是都是通过NetworkClient封装的poll方式。
结合磁盘顺序写入,批量无疑是非常有必要(如果用的时候每发送一条消息都调用future.get等待,性能至少下降2个数量级)。写入的时候放到RecordAccumulator进行聚合,批量压缩,还有批量刷盘等...
1 写入方式 producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。
2 分区(Partition) Kafka集群有多个消息代理服务器(broker-server)组成,发布到Kafka集群的每条消息都有一个类别,用主题(topic)来表示。
每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。
消费者 offset 是按照 组 + 分区 + topic 来进行维护
发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务器端的指定分区后,都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其他一些元数据信息最后都会存储到分区日志文件中。
image.png
1)producer先从zookeeper的 “/brokers/…/state”节点找到该partition的leader 2)producer将消息发送给该leader 3)leader将消息写入本地log 4)followers从leader pull消息,写入本地log后向leader发送ACK 5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。如下图所示,有两个消费者(不同消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6。消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
半数以上follower完成备份 发送 ACK, 问题是选举新的leader ,容忍 n 台故障,需要 2n + 1 个副本
全部完成, 问题是延迟长, Kafka 选择这种,但是问题是 存在一个慢 ,或者挂掉,
kafka 中维护 ISR 的队列
当leader 接受到消息后,通知 ISR 中的follow 完成备份
消费数据一致性:Leo: 每一个分区副本的最大 offset ,设置一个 HW 指的是高水位,所有分区leo的最小位置,HW之前的数据才对消费者可见
存储数据一致性: 重新选leader 给所有分区发生消息,直接截取数据到HW.
幂等性 + 至少一次 为精准一次
使用幂等性,在kafka 的 broker 消除数据的重复, kafka使用幂等性,默认 ack 为-1
首先给每一个生产者 添加一个 id , 给每一个消息 添加一个序列号, 如果同一个 生产者, 同一个消息序列号, 发往同一个分区,如果已经接受过,就进行去重。
但是生产者挂了重启,那么它的id 号也就变了,也就不能保证精准 一致性
保证消费者组里面消费的topic 是一样的。 Range 是按照单个主题进行划分,将不同的topic 不当做一个整体进行考虑。
触发时在消费者组里面消费者个数变化时会触发分区,重新设置分配分配策略。
Range 分区不会把主题看做一个整体进行划分
假设 有两个主题, T1(0,1,2), T2(0,1,2), 两个消费者组 (A,B) (C)
A 消费者 订阅 T1 , B 订阅 T1, T2 ,C 订阅了 T1
RR : 如果采用的RR 发现 A,B 消费者共用同一个组, 则会把 A,B 订阅的topic 当做一个整体进行考虑。
A,B 进行轮询的分区有: T1 0 T1 1 T1 2 T2 0 T2 1 T2 3
Range : 按主题划分,先考虑谁订阅了这个主题,然后再进行划分
消费者组 + 主题 + 分区 决定 offset, 消费者连接 Kafka 可以顺序写磁盘, 零拷贝技术
答:使用消费者策略和生产者策略保证负载均衡
答:基本上,复制日志的节点列表就是副本。特别是对于特定的分区。但是,无论他们是否扮演领导者的角色,他们都是如此。 此外,ISR指的是同步副本。在定义ISR时,它是一组与领导者同步的消息副本。
消费者消费后没有commit offset(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible)
https://www.cnblogs.com/wangzhuxing/p/10124308.html
(1)生产者发送重复解决方案
1、启动kafka的幂等性 要启动kafka的幂等性,无需修改代码,默认为关闭,需要修改配置文件:enable.idempotence=true 同时要求 ack=all 且 retries>1。
幂等原理:每个producer有一个producer id,服务端会通过这个id关联记录每个producer的状态,每个producer的每条消息会带上一个递增的sequence,服务端会记录每个producer对应的当前最大sequence,producerId + sequence ,如果新的消息带上的sequence不大于当前的最大sequence就拒绝这条消息,如果消息落盘会同时更新最大sequence,这个时候重发的消息会被服务端拒掉从而避免消息重复。该配置同样应用于kafka事务中。
2、ack=0,不重试。
可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。
(2)消费者重复消费解决方案
1、取消自动自动提交 每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。
2、下游做幂等 一般的解决方案是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID,例如订单ID和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位点做乐观锁拒绝掉旧位点的数据更新。
flink 使用 upset 来保证数据的幂等性,每一个数据都有一个唯一的id, 不存在插入,存在更新
https://www.cnblogs.com/bigshark/p/11210876.html
https://www.cnblogs.com/kevingrace/p/9443270.html
https://blog.csdn.net/CoderBoom/article/details/84845197
at least once + 幂等性 = exactly once
答:每当Kafka生产者试图以代理的身份在当时无法处理的速度发送消息时,通常都会发生QueueFullException。但是,为了协作处理增加的负载,用户需要添加足够的代理,因为生产者不会阻止。
答:保留期限保留了Kafka群集中的所有已发布记录。它不会检查它们是否已被消耗。此外,可以通过使用保留期的配置设置来丢弃记录。而且,它可以释放一些空间。
答:Kafka可以接收的最大消息大小约为1000000字节。 问题24:传统的消息传递方法有哪些类型? 答:基本上,传统的消息传递方法有两种,如:
排队:这是一种消费者池可以从服务器读取消息并且每条消息转到其中一个消息的方法。 发布-订阅:在发布-订阅中,消息被广播给所有消费者。
答:ISR指的是同步副本。这些通常被分类为一组消息副本,它们被同步为领导者。
答:对于我们的集群,Kafka MirrorMaker提供地理复制。基本上,消息是通过MirrorMaker跨多个数据中心或云区域复制的。因此,它可以在主动/被动场景中用于备份和恢复;也可以将数据放在离用户更近的位置,或者支持数据位置要求。
答:我们可以轻松地将Kafka部署为多租户解决方案。但是,通过配置主题可以生成或使用数据,可以启用多租户。此外,它还为配额提供操作支持。
答:我们知道,在Kafka中,消息会保留相当长的时间。此外,消费者还可以根据自己的方便进行阅读。尽管如此,有一种可能的情况是,如果将Kafka配置为将消息保留24小时,并且消费者可能停机超过24小时,则消费者可能会丢失这些消息。但是,我们仍然可以从上次已知的偏移中读取这些消息,但仅限于消费者的部分停机时间仅为60分钟的情况。此外,关于消费者从一个话题中读到什么,Kafka不会保持状态。
答:Kafka的局限性是:
没有完整的监控工具集 消息调整的问题 不支持通配符主题选择 速度问题
重平衡本质上是一种协议,规定了 消费者组下的所有消费者,按照什么策略消费 Topic
就是 给消费组 中的每一个消费者分配消费 任务的过程。
kafka 采用的是同步和异步共同优点的备份策略,即将leader 的所有 follower 进行同步完毕才返回,ack. 只不过这个全部的副本是指的是 在 ISR 队列中的副本。
ISR 队列,是指 follower 副本存活且和 zookeeper 保持连接,同时其响应时间 较快。不满足条件的会被踢出去,满足的会被加入。
HW 高水位,表明 所有副本都同步到的 offset ,所有分区的最小offset ,那么 leader 也向 消费者提供的 HW.
LEO 每一个分区上的最新(大) offset
kafka采取同步和异步的共同优点,所以使用ISR的方法。把Follow中同步慢的节点从ISR中进行T除,从而保证了复制数据的速度。如果leader副本宕机,那么从ISR中选举出来新的leader副本。因为follow副本中都有记录HW。这样也会减少数据的丢失。Follow副本能够从leader中批量的读取数据并批量写入,从而减少了I/0的开销。
kafka 处理请求 类似于 Reactor 模式。
网络线程负责接受 请求, 然后将请求放入共享的请求队列中。
broker 有个 IO线程池, 负责从共享队列中取出请求, 执行真正的处理, 如果是 produce ,将消息写入底层磁盘的日志中, 如果是 fetch ,则从磁盘读取消息。
当 IO线程 处理完请求,将生成的响应 发送到 网络 线程池的响应队列中
请求队列是所有网络线程共享的,而响应队列是每个网络线程专属的
早期的版本并没有采用 kafka Controller 对分区和副本进行管理,而是依赖于 zookeeper, 每一个 broker 都会在 zookeeper 上为分区和副本注册大量的监听器。这种严重依赖于zookeeper 会有脑裂和 羊群效应。
只有kafka 的 controller 监控 zookeeper , 其他的 node 再和 controller 通信,减少 zookeeper的 压力。
什么是脑裂? kafka中只有一个控制器controller 负责分区的leader选举,同步broker的新增或删除消息,但有时由于网络问题,可能同时有两个broker认为自己是controller,这时候其他的broker就会发生脑裂,不知道该听从谁的。
如何解决?controller epoch 每当新的controller产生的时候就会在zk中生成一个全新的、数值更大的controller epoch的标识,并同步给其他的broker进行保存,这样当第二个controller发送指令时,其他的broker就会自动忽略。
在任意时刻,集群中有且仅有一个控制器。
每个broker启动的时候会去尝试去读取zookeeper 中/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
目的: 进行retry重试时,只会生成一个消息。
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。不是offset
在事务属性之前先引入了生产者幂等性,它的作用为:
生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败 consumer-transform-producer模式下,因为消费者提交偏移量出现问题,导致在重复消费消息时,生产者重复生产消息。需要将这个模式下消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。
事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
消费者组是 kafka 提供的可以扩展且具有容错性的消费者机制。 一个分区,只能被消费者组中的一个消费者进行消费。
当消费者数量多于分区数量时,多于的消费者空闲。
当消费者数量少于分区数量时,一个消费者可能订阅多个分区。
发挥 consumer 最大的效果就是,consumer 数和topic 下的 partitions 数相等。
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。 整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.
2个,主线程和Sender线程。主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator中。Sender线程负责将RecordAccumulator中消息发送到kafka中.
offset+1
消费者消费后没有commit offset(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible)
1.在每个线程中新建一个KafkaConsumer
2.单线程创建KafkaConsumer,多个处理线程处理消息(难点在于是否要考虑消息顺序性,offset的提交方式)
kafka:一个topic,一个partition,一个consumer,内部多线程.
kafka写到一个partition中的数据一定是有数据的。
解决办法: 一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可,既消费者不要使用多线程进行处理。或者你想多线程进行处理,你可以建立一个内存队列,通过hash进任务id相同分到一个内存队列,每一个内存队列对应一个线程。
ActiveMQ :最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,单机吞吐量,万级,吞吐量比RocketMQ和Kafka要低了一个数量级,响应为ms级别,有较低的概率丢失数据。
RabbitMQ :单机吞吐率万级,吞吐量比RocketMQ和Kafka要低了一个数量级,但是适合于中小型企业,因为自带了友好的监控和维护界面,社区相对比较活跃,几乎每个月都发布几个版本分,在国内一些互联网公司近几年用rabbitmq也比较多一些,但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重,同时语言在国内很少有人会。
RocketMQ :单机吞吐量10万级,RocketMQ也是可以支撑高吞吐的一种MQ,topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降,这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic,可用性非常高,分布式架构,在阿里大规模应用过,有阿里品牌保障,日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,源码是JAVA.
Kafka : 单机吞吐量10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景.topic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源,可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用。