前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Storm0.9 kafkaSpou 源码分析

Storm0.9 kafkaSpou 源码分析

作者头像
Flink实战剖析
发布2022-04-18 11:13:04
3140
发布2022-04-18 11:13:04
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

先看一下storm 与 kafka 的集成方式:

代码语言:javascript
复制
 BrokerHosts hosts=new ZkHosts(Zk);   // zk 的地址
  String ZkRoot="/brokers/Topic";   // storm 的元数据信息在zk上的存储位置
  SpoutConfig kafkaSpoutConfig=
          newSpoutConfig(hosts,topic,ZkRoot,groupid);
  kafkaSpoutConfig.scheme=
          new SchemeAsMultiScheme(new StringScheme()); // 对消息的解析方式
  kafkaSpoutConfig.forceFromStart=false;   // 是否从最开始的位置或者                   最开始的提交位置开始消费
 KafkaSpout KafkaSpot=new KafkaSpout(kafkaSpoutConfig); 

以kafkaSpout 为入口分析:KafkaSpout 继承 BaseRichSpout类, 并且重写了其open 、nextTuple、ack、fail、declareOutputFields重要方法。 open 初始化方法:

重点分析一下nextTuple方法:

PartitionManager中next方法:

通过上述代码可以看出初始的offset 是_emittedToOffset值,那么_emittedToOffset是如何初始化的,查看PartitionManager的构造方法:

由此得出初始的消费位置:如果zk上有元数据信息则从zk上的位置开始,否则就根据forceFromstart 的位置开始。以后每次消费根据失败的最小位置或者成功的最大位置。

Zk上元数据的更新:当一次获取的消息都被发送成功,就会根据_spoutConfig. stateUpdateIntervalMs的值判断是否需要更新元数据,将最近一次处理完成的offset提交给 zk.这里的处理完成是指处理一条消息之后spout收到ack请求。

partitionManager在调用ack方法会将_pending移除该条记录,_pending记录了正在处理的消息。如果没有_pending的数据,那么就直接提交最近一次读取kafka的数据的最大值到zk上, 如果有就将正在处理的最小值提交给zk。

相应的KafkaSpou 调用fail方法会调用PartitionManager的fail方法,将处理失败的消息offset加入失败队列中即failed

由于kafkaSpout使用的simple api 的consumer , 没有主动向kafka 提交offset,那么就无法在kafka的管理平台观察到其消费速度。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-11-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档