在创建KafkaProducer
时需要传入Properties
指定一些参数
bootstrap.servers
: kafka
集群各个服务器地址key.serializer
: 用来序列化key
的实现org.apache.kafka.common.serialization.Serializer
接口的类(如果没有指定partition
,相同key
会发送到同一分区)value.serializer
: 用来序列化value
的实现org.apache.kafka.common.serialization.Serializer
接口的类buffer.memory
: 整个Producer
使用的内存,默认大小为32M
,这些内存可以用于RecordAccumulator
,NetWorkClient
中未确认的请求等。batch.size
: 当多条记录发送到同一partition
时,producer
将会尝试将这些记录批处理到一起,及当记录累计达到batch.size
后再一起发送,默认大小为16K
。可与linger.ms
配合使用。linger.ms
: 当partition
中的记录迟迟达不到batch.size
的大小时,如果不设置超时时间则这些记录可能一直阻塞,设置linger.ms
可以让记录在超时后发送而不会堆积,默认为0ms
即立即发送,通常可设置范围到`5~100ms``acks
: NetworkClient
发送请求消息到kafka cluster
后cluster
的应答机制,取值可以为0
, 1
, -1
或all
,其中-1
和all
是一样的。retries
: NetworkClient
发送请求消息的失败重试次数。replica.lag.time.max.ms
: ISR
队列中节点的同步时间,超出这个时间不同步则会被移除ISR
队列。min.insync.replicas
: ISR
队列的最小应答数enable.idempotence
: 开启幂等性max.in.flight.requests.per.connection
: 在阻塞前,每个broker最多缓存5个未确认的请求,第6个请求进来时不发送,直到有未确认的请求得到确认。acks
取值可以为0
, 1
, -1
或all
,其中-1
和all
是一样的。
0
: producer
发送过来的数据,不需要等待数据持久化就立马应答
1
: producer
发送过来的数据,Leader
收到数据后持久化成功后就应答
-1
或all
: producer
发送过来的数据,Leader
和ISR
队列满足最小应答数后才应答,其中ISR
队列满足最小应答数可以通过min.insync.replicas
参数设置。
ISR
队列指的是和Leader
保持同步的Leader
和Follower
的集合,注意这里也包括Leader
自己。eg:(leader:0, isr:0, 1, 2)
acks=0
,可靠性低一般不使用
acks=1
: Leader
会应答,可靠性高一点点,但如果在Leader
收到数据后节点就挂了则也会丢失数据,一般用于日志传输
的场景,允许丢一点数据
acks=-1
: 可靠性很强,但效率非常低。并且在没开启幂等性的情况下会产生重复数据。如Leader
将数据同步到isr
后,返回应答的时候挂了,这时候新的Leader
出现,producer
重试又发送了数据,导致数据重复。
acks=-1
+ 分区副本(leader
+follower
)数大于等于2 + ISR
应答的最小副本数大于等于2。数据不丢失,但不开启幂等性的情况下数据可能会重复。acks=1
。数据不会重复,但可能会丢失。幂等性的判断标准: 具有<PID
,Partition
,SeqNumber
>相同三元组的消息,broker
只持久化一次
PID
: 每个新的Producer
在初始化的时候会被分配一个唯一的PID
Partition
: 消息要发往的分区SeqNumber
: 对每个PID
,Producer
发送时会让<topic
,partition
>对应一个单调递增的SeqNumber
,并且broker
也会缓存Producer
的SeqNumber
,在broker
收到消息时,若SeqNumber
比当前缓存的值小,则把消息丢弃,否则接受判重。幂等性只能保证单会话,单分区消息不重复
// 初始化事务
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 在事务内提交已经消费的偏移量(消费者使用)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务
void abortTransaction() throws ProducerFencedException;
Consumer
消费多分区数据整体不能是有序的,可以做到有序消费单分区数据,但有前置条件。
1.x
版本之前,需要保证max.in.flight.requests.per.connection=1
1.x
版本之后
max.in.flight.requests.per.connection=1
max.in.flight.requests.per.connection<=5
在1.x
版本后,kafka
会缓存producer
发送过来在的5个请求的数据,并对其进行排序。从而保证5个请求内的消息在kafka
内是有序的。
bootstrap.servers
: kafka
集群各个服务器地址key.deserializer
: 用来反序列化key
的实现org.apache.kafka.common.serialization.Deserializer
接口的类value.deserializer
: 用来反序列化value
的实现org.apache.kafka.common.serialization.Deserializer
接口的类partition.assignment.strategy
: 消费者组的分区策略,有RangeAssignor
,RoundRobinAssignor
,StickyAssignor
, CooperativeStickyAssignor
。也可以通过自己实现ConsumerPartitionAssignor
接口,多个分区策略可以混合使用。默认RangeAssignor
+ CooperativeStickyAssigno
group.id
: 消费者组id
enable.auto.commit
: 消费者是否自动提交offset
,默认是true
auto.commit.interval.ms
: 消费者自动提交offset
的时间间隔,默认为5000 (5 seconds)。max.poll.records
: 对poll()
的单个调用中返回的最大记录数Reference:
先对单个topic
的各个分区partition
以及各个消费者组里的各个consumer
按序号排序。然后用partition数/consumer数
得到每个消费者需要消费的分区数量,然后每次将连续的partition
按相除的数量分配给每个consumer
。这里可能会出现不能整除的情况,多出来的余数个分区则按顺序分给每个消费者,也就意味着有一些消费者会多消费一个分区。示意图:
缺点:
这种分配算法会造成数据倾斜,即会造成某个consumer
压力过大。对于少个topic
来说,consumer
对于每个topic
多消费一个分区问题不大,如果kafka
里有很多的topic
,而这些topic
多出来的分区都要由排序靠前的consumer
来承当,则会造成这些consumer
的负载压力要比其他的大的多。
RangeAssignor
是针对单个topic
的,而RoundRobinAssignor
是针对所有topic
的,RoundRobinAssignor
会将所有topic
的分区partition
和消费者排序,然后轮询所有的partition
分配给每个消费者。如果订阅的topic
列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些topic
的分配。
在RangeAssignor
和RoundRobinAssignor
中,当有consumer
挂掉时都会做重分配rebalance
,即重新分配每个消费者对应消费哪个分区,重分配后他们消费的分区可能会和rebalance
前的差别较大。StickyAssignor
就是尽量做到rebalance
前后消费者负责的分区不发生较大变化,即每次重分配的分配结果尽量和上一次的保持一致。并且相比前面两种策略其在分区的时候尽可能做到随机分配而不是顺序分配。
总之,StickyAssignor
的目标有两点:
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor
特性的。
上述三种分区分配策略均是基于eager
协议,Kafka2.4.0
开始引入CooperativeStickyAssignor
策略。CooperativeStickyAssignor
与之前的StickyAssignor
虽然都是维持原来的分区分配方案,最大的区别是:StickyAssignor
仍然是基于eager
协议,分区重分配时候,都需要consumers
先放弃当前持有的分区,重新加入consumer group
;而CooperativeStickyAssignor
基于cooperative
协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。
具体解析见Reference2
为了能够记录consumer group
消费某topic
的进度,kafka
采用了offset
来记录消费进度。
在Kafka 0.9
之前,这些offset
信息是保存在zookeeper
中的,在0.9
后则保存到kafka
的一个内置的topic
,__consumer_offsets
中。该topic
有50个分区。默认是无法读取的,可以通过设置exclude.internal.topics=false
来读取。
默认情况下消费者的offset
是自动提交的,可通过enable.auto.commit
来设计是否开启以及设置auto.commit.interval.ms
设置自动提交的时间间隔。
消费者可以手动提交offset
,方式可以是异步和同步,同时也可以指定offset
的位置开始消费(可通过时间来找到指定offset
然后开始消费,如消费从一天前的现在对应的offset
,对应api
为offsetsForTimes
开始消费)。
partition
: 消息的分区
log
: 逻辑概念,指的是保存分区信息的文件综合
segement
: 每个log
由多个segement
组成,每个segement
最大为1G
,其由xxx.index
,xxx.log
,xxx.timeindx
组成,log
文件保存的是消息,其中包括消息的大小,offset
等。而index
消息保存的其log
的稀疏索引,而timeindex
时间戳文件保存的是当前日志段的最大的时间戳以及该时间戳对应的偏移量。
segement
的大小可以通过log.segment.bytes
设置,默认1G
创建副本存储计划
{
"version":1,
"partitions":[
"topic":"test", // 修改哪个主题
"partition":0, // 修改主题里哪个分区
"replicas":[0,1,2] // 副本要放在哪些broker
},
{
"topic":"test",
"partition":1,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":2,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":3,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":4,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":5,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":6,
"replicas":[0,1,2]
},
]
}
执行计划
bash ./kafka-reassign-partitions.sh --bootstrap-server kafka1:9091,kafka2:9092,kafka3:9093 --reassignment-json-file define-create-partition.json --execute
验证
bash ./kafka-reassign-partitions.sh --bootstrap-server kafka1:9091,kafka2:9092,kafka3:9093 --reassignment-json-file define-create-partition.json --verify
kafka
数据默认保存7天,可以调整以下参数修改:
log.retention.hours
: 最低优先级,按小时数来清理,默认168(7天)log.retenion.minutes
: 分钟log.retenion.ms
: 最高优先级,毫秒级别log.retenion.check.interval.ms
: 负责设置的检查文件是否清理的周期,默认5分钟当超时后,kafka
有两种策略要处理消息,分别是delete
和compact
,可以通过修改以下参数来设定超时删除策略
log.cleanup.policy
: 当segement
超出大小之后的默认清理策略,其中有效的策略选项可以使用逗号分开。有效的策略选项有delete
和compact
。默认delete
delete
默认开启了按照时间删除过期时间。其是按照xxx.timeindex
记录的时间来标志的,因此基于时间的删除是以该segement
的所有记录的最大时间戳来作为该文件的时间戳来删除的。
同时delete
也提供了基于大小的删除配置,其配置参数如下:
log.retention.bytes
: 即日志文件达到多大则删除,默认为-1
即不限制,这个选项的值如果小于segment文件大小的话是不起作用的。segment文件的大小取决于log.segment.bytes配置项,默认为1G。另外,Kafka的日志删除策略并不是非常严格的(比如如果log.retention.bytes设置了10G的话,并不是超过10G的部分就会立刻删除,只是被标记为待删除,Kafka会在恰当的时候再真正删除),所以请预留足够的磁盘空间。当磁盘空间剩余量为0时,Kafka服务会被kill掉。
Reference: https://wiki.eryajf.net/pages/9fc4fa/#%E6%8C%89%E6%97%B6%E9%97%B4%E5%88%A0%E9%99%A4
compact
在compact
中,若数据过期,则采用压缩形式。具体是保留每一个key
的最后一个版本的数据。(在Producer
发送时可以指定key
,相同的key
会被发到同一个partition
)。如下图:
具有相同key
的数据只保留最新的那个版本。压缩后的offset
不连续,当consumer
消费的offset
找不到后会向上找到最近的一个offset
开始消费。如当尝试从offset=5
开始消费时,其会找到offset=6
。
Reference: https://juejin.cn/post/6863050320646406158
kafka
会把每个topic
分为多个partition
,并行处理加快速度。
kafka
写log
日志时采用的是文件追加的形式,顺序读写速度快。并且由于log
分成了多个segement
,segement
删除时都是整块删除的,这也避免了对文件的随机读写操作。并且在定位数据log
时也可以采用xxx.index
提供的稀疏索引来快速定位。
broker
网络IO数据持久化到磁盘(Producer
->broker
)中以及broker
消息发送到网络IO中(broker
->Consumer
)
而采用零拷贝后,如mmap()
,则可以减少两次CPU拷贝
Kafka
在这里采用的方案是通过 NIO
的 transferTo/transferFrom
调用操作系统的 sendfile
实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 CPU 数据拷贝。sendfile
仅将内核空间缓冲区中对应的数据描述信息(文件描述符、地址偏移量等信息)记录到socket
缓冲区中。
Producer
数据批处理
设定batch.size
来让发往同一分区的数据批处理发出。
Producer
对消息的压缩
Producer
可以采用各种算法对消息进行压缩,有Snappy
、Gzip
、LZ4
等