Producer的主要功能就是向broker中某个topic的某个分区发送消息,主要由分区器(partitioner)实现。如何选择分区,然后高吞吐的可靠发送到broker是producer的重点。
发送消息
首先,Kafka producer提供了一个默认的分区器。如果消息指定了key则分区器会根据key的哈希值来选择目标分区,若该消息未指定key,则分区器使用轮询的方式确认目标分区,也可自定义分区策略。确认分区后分区器需要寻找该分区对应的leader所在的broker,然后向leader写入数据。
当producer发送请求给broker后,broker响应结果的超时时间,默认30s。具体流程如图:
发送流程如下:
主要配置
bootstrap.servers
该属性指定 brokers 的地址清单。
buffer.memory
该参数用来设置producer端内存缓冲区的大小,默认值为32M。如果应用程序发送消息的速度超过发送到服务器的速度,那么会导致生产者内存不足,导致send()方法会被阻塞,如果阻塞的时间超过了max.block.ms配置的时长则会抛出异常
batch.size
该参数用于设置批次发送的阈值,默认16KB
etires
该参数用于当生产者发送消息到broker失败时的重试次数,默认每隔(retry.backoff.on)100ms重试
request.timeout.ms
该参数用于当producer发送请求给broker后,broker响应结果的超时时间,默认30s
发送方式
该方式是最简单的发送数据方式,不管消息是否可靠到达,本质上也是一种异步方式吞吐量最高,但无法保证消息的可靠性。
可以明确知道消息发送的结果,然后对结果进行处理,由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送,导致吞吐量下降。
在调用send方法发送消息的同时指定一个回调函数,broker在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,比如记录异常日志再统一处理。
幂等性
Producer 的幂等性指的是当发送同一条消息时,数据在broker端只会被持久化一次,数据不丟不重,也就是exactly-once,但是在分布式系统中,出现网络分区是不可避免的,例如以下情况:
幂等性的目的就是为了解决重复的问题at least once + 幂等 = exactly once,幂等性是通过两个关键信息保证:
PID
每个producer初始化时会由broker分配一个唯一的PID来标识唯一的producer,producer每次启动都不一样。
sequence numbers
Broker端通过接收到message的PID和sequence numbers 信息进行校验。最新版本中可通过设置enable.idempotence = true开启幂等性,但是Kafka的幂等性是有条件的:
事务性
幂等性提供了单会话单partition的Exactly-Once语义的实现,正是因为幂等性不提供跨partition和跨会话场景下的保证,因此需要一种更强的事务保证,能够原子处理多个partition的写入操作,数据要么全部写入成功,要么全部失败,kafka是事务性类似于二阶段提交,总的来说就是能够实现【跨分区的原子写入】。
kafka的事务性可以保证:
事务属性实现前提是幂等性,即配置事务属性还必须还得配置幂等性,但是幂等性是可以独立使用的,不需要依赖事务属性。如果需要保证消息的事务性,需保证如下配置:
领取专属 10元无门槛券
私享最新 技术干货