sparkStreaming与Kafka整合

createStream那几个参数折腾了我好久。。网上都是一带而过,最终才搞懂..关于sparkStreaming的还是太少,最终尝试成功。。。

首先启动zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

bin/kafka-server-start.sh config/server.properties &

创建一个topic

./kafka-topics.sh  --create --zookeeper 192.168.77.133:2181 \ --replication-factor 1\ --partitions 1\ --topic yangsy

随后启动一个终端为9092的提供者

./kafka-console-producer.sh --broker-list 192.168.77.133:9092 --topic yangsy

代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}



/**
 * Created by root on 11/28/15.
 */
object SparkStreaming {
  def main(args: Array[String]) {
/*    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp")
      .set("spark.executor.memory", "1g")
    val sc = new StreamingContext(sparkConf, Seconds(20))
    val lines = sc.textFileStream("/usr/local/spark-1.4.0-bin-2.5.0-cdh5.2.1/streaming")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    sc.start()
    sc.awaitTermination()*/
  //zookeeper的地址
    val zkQuorum = "192.168.77.133:2181"
   //group_id可以通过kafka的conf下的consumer.properties中查找
    val group ="test-consumer-group"
  //创建的topic 可以是一个或多个
    val topics = "yangsy"
    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").set("spark.executor.memory", "1g")
    val sc = new StreamingContext(sparkConf, Seconds(2))
    val numThreads = 2
    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //StorageLevel.MEMORY_AND_DISK_SER为存储的级别
    val lines  = KafkaUtils.createStream(sc, zkQuorum, group, topicpMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    //对于收到的消息进行wordcount
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    sc.start()
    sc.awaitTermination()

  }
}

随后再你启动的kafka的生产者终端随便输入消息,我这里设置的参数是每2秒获取一次,统计一次单词个数~OK~

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏YoungGy

Spark入门_1_RddTransAction

ipython spark core concept RDD creating rdds rdd operations transformation actio...

2885
来自专栏about云

让你真正明白spark streaming

spark streaming介绍 Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等...

3517
来自专栏大数据学习笔记

Spark2.x学习笔记:11、RDD依赖关系与stage划分

11、 RDD依赖关系与stage划分 Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分stage,而划分依据就是RDD之间的...

2785
来自专栏涂小刚的专栏

【Spark教程】核心概念RDD

RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通...

2K0
来自专栏Albert陈凯

Spark系列课程-00xxSpark任务调度疑问,生成有向无环图的这个东西叫什么名字?

下面我们一起来看一下Spark的任务调度 ? Spark任务调度.png 首先最左边的叫做RDD Object就是一个一个的RDD对象 一个一个的RDD对象,...

44714
来自专栏Albert陈凯

Spark概要掌握情况自我核查

1、Spark目前只持哪哪种语言的API? Java, Scala, Python, R. Ref: http://spark.apache.org/ 2、R...

2603
来自专栏伦少的博客

Spark 持久化(cache和persist的区别)

4554
来自专栏xingoo, 一个梦想做发明家的程序员

[Hadoop大数据]——Hive数据的导入导出

Hive作为大数据环境下的数据仓库工具,支持基于hadoop以sql的方式执行mapreduce的任务,非常适合对大量的数据进行全量的查询分析。 本文主要...

8237
来自专栏Java技术栈

Tomcat集群session复制与Oracle的坑。。

问题描述 公司某个系统使用了tomcat自带的集群session复制功能,然后后报了一个oracle驱动包里面的连接不能被序列化的异常。 01-Nov-2017...

3779
来自专栏一名叫大蕉的程序员

Spark你一定学得会(二)No.8

第一次分享的妥妥就是入门的干货,小伙伴们最好可以自己敲一敲代码,不然只看我的分享一点用都木有。但还是有很多小伙伴表示看不懂,没关系,慢慢来自己操作一遍有什么问题...

19210

扫码关注云+社区

领取腾讯云代金券