前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解kafka: 核心设计与实践原理

深入理解kafka: 核心设计与实践原理

作者头像
shengjk1
发布2019-06-16 13:11:13
2.4K0
发布2019-06-16 13:11:13
举报
文章被收录于专栏:码字搬砖码字搬砖码字搬砖

bootstrap.servers ,分割,这里并非需要所有的broker地址清单,因为生产者会从给定的broker里查找到其他broker信息,不过建议 至少要设置两个以上broker地址信息

broker端接收的消息必须以字节数组( byte[] )形式存在,在发往broker之前需要将消息中对应的 key 和 value 做相应的序列化操作 来转化成字节数据 生产者需要用序列化器把对象转换成字节数组才能通过网络发送给kafka,而在对侧,消费者需要用反序列化器把kafka中收到的字节数据转化 成相应的对象 生产者和消费者的序列化器需要一一对应的 不建议自定义序列化器或反序列化器,会增加生产者和消费者之间的耦合度。 在实际应用中,在kafka提供序列化器和反序列化器满足不了应用需求的前提下,推荐使用 avro json thrift、protobuf等通过的序列化包

在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了

默认分区,当key不为null时,那么默认的分区器会对key进行哈希,如果key为null,轮询

kafka producer 线程安全的

回调函数也可以保证分区有序

发送消息三种方式:1.发后即忘 send() 2.同步(sync) send().get() 3.异步(async) send(,callback)

生产者客户端的整体架构:

在这里插入图片描述
在这里插入图片描述

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能,其大小可以通过生产者客户端参数 buffer.memory配置,默认是32M

batch.size

max.blocks.ms

max.in.flight.requests.pre.connection 默认 5,每个连接(客户端与Node之间的连接)最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应

元数据的更新:

metadata.max.age.ms 默认 5min

acks=all 并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks=1 的情况。要获得更高的消息可靠性需要配合 min.insysc.replicas 等参数的联动.

max.request.size 限制生产者客户端能发送的消息的最大值 默认 1M message.max.bytes

retries 默认值 0 和retry.backoff.ms 默认值 100

compression.type 默认 none 对消息进行压缩可以极大的减少网络传输量、降低网络I/O,从而提高整体性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩

connections.max.idle.ms 多久之后关闭空闲连接 默认 9min

linger.ms 生产者发送 ProducerBatch 之前等到更多消息(ProducerRecord)加入 ProducerBatch 的时间。 默认 生产者客户端会在 ProducerBatch 被填满或者等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但同时能提升一定的吞吐量( 当不为0的时候,适当调大batch.size 才有价值跟意义)

receive.buffer.bytes 设置 socket接收消息缓冲区(SO_RECBUF)的大小,默认 32K,如果设置 -1,则使用操作系统默认值。如果 producer 与kafka 处于不同机房,则可以适当调大这个参数

send.buffer.bytes 同上 默认128K

request.timeout.ms producer 等待请求响应的最长时间 默认 30s 注意这个参数要比 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起消息重复的概率.

enble.idempotence 默认 false 是否开启幂等性

消费者

partition.assignment.strategy来设置消费者与订阅消费者与订阅主题之间的分区分配策略

Replicas AR集合

inSynReplicas ISR集合

offlineReplicas OSR集合

通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。而通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的

Kafka 消费者是基于拉模式的( 消息的消费一般有两种模式:推模式和拉模式。 推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉消息 )

对于 poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空。

/*
					可以获取消息集中的指定分区的消息
					可以获取消息集中的指定topic的消息
					可以获取消息集中所有的分区
					 */
//					records.records()
					
					/*
					count() 方法计算出消息集中的消息个数 int
					isEmpty() 判断消息集是否为空   boolean
					empty()   获取一个空消息集
					 */

kafka中它的每天消息都有唯一的offset,用来表示消息在分区中对应的位置 偏移量。对于消费者而言,它也有一个offset,消费者使用 offset 来表示消费到分区中某个消息所在的位置 位移 消费者消费到的位置 lastConsumerOffset

在这里插入图片描述
在这里插入图片描述

position=commited offset =lastConsumedOffset+1 position, commited offset 也不会一直相同

enable.auto.commit 默认 true enable.auto.interval.ms默认 5s

待续,未完

生产者、消费者 拦截器

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年06月15日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消费者
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档