首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Sparkstreaming + Kafka到hdfs

Sparkstreaming + Kafka到hdfs
EN

Stack Overflow用户
提问于 2018-07-05 21:58:52
回答 1查看 193关注 0票数 1

当我尝试使用spark streaming使用来自kafka主题的消息时,得到以下错误

代码语言:javascript
运行
复制
scala> val kafkaStream = KafkaUtils.createStream(ssc, "<ipaddress>:2181","spark-streaming-consumer-group", Map("test1" -> 5))

错误:

代码语言:javascript
运行
复制
`missing or invalid dependency detected while loading class file 'KafkaUtils.class'.
Could not access term kafka in package <root>,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'KafkaUtils.class' was compiled against an incompatible version of <root>.`

Scala版本: 2.11.8 spark版本: 2.1.0.2.6.0.3-8

我使用了spark-streaming-kafka的所有类库,但都不起作用:

我正在执行spark shell中的代码:

代码语言:javascript
运行
复制
./spark-shell --jars /data/home/local/504/spark-streaming-kafka_2.10-1.5.1.jar, /data/home/local/504/spark-streaming_2.10-1.5.1.jar

代码

代码语言:javascript
运行
复制
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
val kafkaStream = KafkaUtils.createStream(ssc, "<ipaddress>:2181","spark-streaming-consumer-group", Map("test1" -> 5))

对这个问题的任何建议。

EN

回答 1

Stack Overflow用户

发布于 2018-07-05 23:42:58

由于您使用的是Scala2.11和Spark2.1.0,因此应该使用这些jars

  • spark-streaming-kafka-0-10_2.11-2.1.0.jar
  • spark-streaming_2.11-2.1.0.jar

如果您使用的是Kafka 0.10+,请进行相应的更改。

这个简单的程序看起来像这样

代码语言:javascript
运行
复制
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer

val streamingContext = new StreamingContext(sc, Seconds(5))

//Parameters for kafka
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "servers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "test-consumer-group",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = "topics,seperated,by,comma".split(",")

// crate dstreams
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

//stream.print()
stream.map(_.value().toString).print()

希望这篇文章能有所帮助!

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51193249

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档