如何首次使用apache kafka集成部署storm-core拓扑?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (256)

想获得有关apache风暴和kafka设置初始设置的帮助。

我能够向风暴集群提交拓扑结构,但在风暴ui中获得低于错误的结果。

Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)

我的代码片段如下。

// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot       = "/kafka-cluster-1/brokers/topics";
String topicName    = "myfirsttopic";

/* ****************************************************************** */
/* Topology configuration variable                                    */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt   */
Integer boltParalismHint  = 1;
Integer spoutParalismHint = 1;

/* ****************************************************************** */
/* Build kafka consumer spout                                         */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );

// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );

spoutConfig.ignoreZkOffsets = true;

// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );

// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

我将ignoreZkOffsets设置为true

如果想强制喷口忽略ZooKeeper中存储的任何状态信息,则应该将参数KafkaConfig.ignoreZkOffsets设置为true

由于它是初始设置,我怎样才能停止Zookeeper的读数偏移?

我使用以下版本。

  • apache storm 1.2.1
  • apache kafka kafka_2.12-1.1.0
提问于
用户回答回答于

但在下面的情况下,错误似乎并没有出现在storm

  1. 在Kafka中创建主题
  2. 确保brokerZkPath中存在zookeeper
  3. 确保Zookeeper中存在zkRootPath主题目录路径。在我的情况/ kafka-cluster-1 / brokers / topics中
  4. 将拓扑提交给storm

扫码关注云+社区

领取腾讯云代金券