前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >sparkStreaming与Kafka整合

sparkStreaming与Kafka整合

作者头像
用户3003813
发布2018-09-06 13:59:02
3030
发布2018-09-06 13:59:02
举报
文章被收录于专栏:个人分享个人分享个人分享

createStream那几个参数折腾了我好久。。网上都是一带而过,最终才搞懂..关于sparkStreaming的还是太少,最终尝试成功。。。

首先启动zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

bin/kafka-server-start.sh config/server.properties &

创建一个topic

./kafka-topics.sh  --create --zookeeper 192.168.77.133:2181 \ --replication-factor 1\ --partitions 1\ --topic yangsy

随后启动一个终端为9092的提供者

./kafka-console-producer.sh --broker-list 192.168.77.133:9092 --topic yangsy

代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}



/**
 * Created by root on 11/28/15.
 */
object SparkStreaming {
  def main(args: Array[String]) {
/*    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp")
      .set("spark.executor.memory", "1g")
    val sc = new StreamingContext(sparkConf, Seconds(20))
    val lines = sc.textFileStream("/usr/local/spark-1.4.0-bin-2.5.0-cdh5.2.1/streaming")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    sc.start()
    sc.awaitTermination()*/
  //zookeeper的地址
    val zkQuorum = "192.168.77.133:2181"
   //group_id可以通过kafka的conf下的consumer.properties中查找
    val group ="test-consumer-group"
  //创建的topic 可以是一个或多个
    val topics = "yangsy"
    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").set("spark.executor.memory", "1g")
    val sc = new StreamingContext(sparkConf, Seconds(2))
    val numThreads = 2
    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //StorageLevel.MEMORY_AND_DISK_SER为存储的级别
    val lines  = KafkaUtils.createStream(sc, zkQuorum, group, topicpMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    //对于收到的消息进行wordcount
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    sc.start()
    sc.awaitTermination()

  }
}

随后再你启动的kafka的生产者终端随便输入消息,我这里设置的参数是每2秒获取一次,统计一次单词个数~OK~

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2015-11-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档