首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何首次部署集成apache kafka的storm-core拓扑?

如何首次部署集成apache kafka的storm-core拓扑?
EN

Stack Overflow用户
提问于 2018-04-21 17:40:43
回答 1查看 435关注 0票数 0

我想获得帮助的初始设置与apache storm和kafka设置。

我可以向storm集群提交拓扑,但在storm ui中出现以下错误。

代码语言:javascript
复制
Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)

下面是我的代码片段。

代码语言:javascript
复制
// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot       = "/kafka-cluster-1/brokers/topics";
String topicName    = "myfirsttopic";

/* ****************************************************************** */
/* Topology configuration variable                                    */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt   */
Integer boltParalismHint  = 1;
Integer spoutParalismHint = 1;

/* ****************************************************************** */
/* Build kafka consumer spout                                         */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );

// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );

spoutConfig.ignoreZkOffsets = true;

// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );

// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

我引用了document并将ignoreZkOffsets设置为true

如果要强制spout忽略存储在ZooKeeper中的任何使用者状态信息,则应将参数KafkaConfig.ignoreZkOffsets设置为true

然而,从日志中看,似乎正在读取Zookeeper的偏移量。

由于这是一个初始设置,我如何才能停止storm读取Zookeeper的偏移量?

我使用以下版本。

  • apache storm 1.2.1
  • apache kafka kafka_2.12-1.1.0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49954396

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档