前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Storm读取Kafka数据是如何实现的

Storm读取Kafka数据是如何实现的

作者头像
用户1410343
发布2018-03-27 12:08:09
1.8K0
发布2018-03-27 12:08:09
举报
文章被收录于专栏:about云about云

问题导读 1.本文基于什么版本? 2.Storm读取Kafka数据是如何实现的? 3.实现一个Kafka Spout有哪两种方式? Strom整合Kafka版本信息

Storm与Kafka的版本信息:

  • Storm:apache-storm-0.9.2-incubating
  • Kafka:kafka_2.9.2-0.8.1.1.tgz

Strom从Kafka中读取数据本质

实现Storm读取Kafka中的数据,参考官网介绍, 本部分主要参考自storm-kafka的README。

Strom从Kafka中读取数据,本质:实现一个Storm中的Spout,来读取Kafka中的数据;这个Spout,可以称为Kafka Spout。实现一个Kafka Spout有两条路:

  • core storm spout;
  • Trident spout;

无论用哪种方式实现Kafka Spout,都分为两步走:

  • 实现BrokerHost接口:用于记录Kafka broker host与partition之间的映射关系;具体两种实现方式:
    • ZkHosts类:从zookeeper中动态的获取kafka broker与partition之间的映射关系;初始化时,需要配置zookeeper的ip:port;默认,每60s从zookeeper中请求一次映射关系;
    • StaticHosts类:当broker–partition之间的映射关系是静态时,常使用此方法;
  • 继承KafkaConfig类:用于存储Kafka相关的参数;将上面实例的BrokerHost对象,作为参数传入KafkaConfig,例,Kafka的一个构造方法为KafkaConfig(BrokerHosts hosts, String topic);当前其实现方式有两个:
    • SpoutConfig:Core KafkaSpout只接受此配置方式;
    • TridentKafkaConfig:TridentKafkaEmitter只接受此配置方式;

KafkaConfig类中涉及到的配置参数默认值如下:

[Bash shell] 纯文本查看 复制代码

?

代码语言:javascript
复制
public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;

上面的MultiScheme类型的参数shceme,其负责:将Kafka中取出的byte[]转换为storm所需的tuple,这是一个扩展点,默认是原文输出。两种实现:SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme可将读取的byte[]转换为String。

notes(ningg):几个疑问,列在下面了

  • ZkHosts类的一个构造方法ZkHosts(String brokerZkStr, String brokerZkPath),其中brokerZkPath的含义,原始给出的说法是:”rokerZkPath is the root directory under which all the topics and partition information is stored. by Default this is /brokers which is what default kafka implementation uses.”
  • SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id),其中,zkRoot是一个root目录,用于存储consumer的offset;那这个zkRoot对应的目录物理上在哪台机器?

配置实例Core Kafka Spout

本质是设置一个读取Kafka中数据的Kafka Spout,然后,将从替换原始local mode下,topology中的Spout即可。下面是一个已经验证过的实例

[Bash shell] 纯文本查看 复制代码

?

代码语言:javascript
复制
TopologyBuilder builder = new TopologyBuilder();
BrokerHosts hosts = new ZkHosts("121.7.2.12:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "ningg", "/" + "ningg", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set Spout.
builder.setSpout("word", kafkaSpout, 3);
builder.setBolt("result", new ExclamationBolt(), 3).shuffleGrouping("word");
Config conf = new Config();
conf.setDebug(true);
// submit topology in local mode
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());

Trident Kafka Spout(todo) todo 下面的样例并还没验证:

[Bash shell] 纯文本查看 复制代码

?

代码语言:javascript
复制
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-08-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 about云 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档