前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >初识kafka中的生产者与消费者

初识kafka中的生产者与消费者

作者头像
爬蜥
发布2019-07-09 10:21:16
1.6K0
发布2019-07-09 10:21:16
举报

发送生产消息的大致流程:

1. 创建生产者对象,生产者发送包装消息的ProducerRecord

2. 生产者通过send方法发送消息

3. 消息被序列化

4. 消息计算出分区

5. 根据分区消息被分配到指定主题和分区的批次中

6. 批量发送到broker

7. broker判断是否消息失败,成功则直接返回元数据【可选】,失败判断是否重试,对应做相应处理

如何创建生产者对象?

必须指定3个属性:broker地址,key的序列化方式,value的序列化方式。其它可选参数,包括重试次数,内存缓冲大小,每次发送消息的批次大小,是否压缩等等

Avro序列化简介

它是一种与语言无关的序列化格式。数据通过schema来定义,如果出现读的schema与写的shema不一致的时候,不会抛出遗产,而选择返回默认值。使用的时候,在注册表中注册一个schema,消息字段schema的标识,然后存放到broker中,消费者使用标识符从注册表中拉取schema进行解析得到结果

如何发送消息?

1. 同步方式:构建消息的封装ProducerRecord,通过生产者的send方法发送即可,可以用Future的方式接收返回的RecordMetadata 2. 异步方式:同步发消息如果服务器之间通信的时间是10ms,那么1s只能发100个消息,因此不等待的方式(异步)可以节省时间,增加吞吐 3. 发送并忘记:只管发送,不处理任何返回值

发送消息的过程中出了异常怎么办?

kafka异常基本有两类,一是能够重试的方式,比如网络连接段了,一是不会重连,比如消息太大,会直接抛异常,对于异步来讲,可以通过使用回调函数来处理期间出现的异常

代码上如何创建消费者并订阅主题?

使用Propertites[包含 server,key.deserializer和value.deserializer]初始化 KafkaConsumer,通过consumer.subscribe即可订阅主题,主题可以是一个列表或者是一表达式

代码上消费者是如何获取数据的?

轮询。消费者订阅了主题后,轮询中处理所有细节,包括群组协调、分区再平衡、发送心跳和获取数据

如何优雅退出轮询?添加shutdownhook,在钩子里头调用消费者的wakeup方法,这样如果读取poll,会抛出wakeup异常,然后调用close方法,保证最后的提交都已经完成,并且告知群组协调器,自己要离开群组,然后就触发了再均衡

消费者和线程之间的关系是什么?

一个群组里面有多个消费者,一个消费者只有一个线程

为什么kafka能够从上次断开的地方再开始读取消息?

kafka对每个分区都有一个偏移量,来跟踪当前消息消费到哪儿去了,如果配置自动提交(更新分区当前位置),默认每5s就上报一次从poll中获取的收到的最大偏移量。但是这种自动方式如果在小于默认的时间之内发生了再均衡,会照成消息重复消费

想自己提交偏移量,避免自动提交存在的问题怎么办?1. 同步提交 [commitSync()],提交最后一次的偏移量。只要不是不可恢复的问题,就会一直重试,但是在broker对提交做出反应前,会一直阻塞,有可能成为吞吐量的瓶颈 ;2. 异步提交[commitAsync()],提交最后一次的偏移量。不重试,如果异步提交出现问题,可以通过回调来观察

某些操作我一定要成功,但是又不想每次阻塞,怎么办?混用同步提交和异步提交。在消息处理的时候异步提交,如果出了问题就catch住,然后同步提交

同步提交和异步提交都只能对最后一次进行提交,我想更频繁的,更自助的控制好提交的频率,怎么做?用map存储每个分区的偏移量,然后根据自己的需求,在读取消息后,异步提交整个map

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年04月30日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 如何创建生产者对象?
  • Avro序列化简介
  • 如何发送消息?
    • 发送消息的过程中出了异常怎么办?
      • 代码上如何创建消费者并订阅主题?
        • 代码上消费者是如何获取数据的?
          • 消费者和线程之间的关系是什么?
            • 为什么kafka能够从上次断开的地方再开始读取消息?
            相关产品与服务
            文件存储
            文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档