前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >来看看一个大二学生的Spark练习题

来看看一个大二学生的Spark练习题

作者头像
王知无-import_bigdata
发布2020-05-07 10:31:05
2.3K0
发布2020-05-07 10:31:05
举报
今天查资料的时候看到一个朋友的博客写的很好,加了好友,对方表示大二的学生,写的Spark的练习题非常接地气并且很适合练手,大家可以看看。

一、基础练习题

首先让我们准备好该题所需的数据 test.txt

数据结构如下依次是:班级 姓名 年龄 性别 科目 成绩

代码语言:javascript
复制
12 宋江 25 男 chinese 50
12 宋江 25 男 math 60
12 宋江 25 男 english 70
12 吴用 20 男 chinese 50
12 吴用 20 男 math 50
12 吴用 20 男 english 50
12 杨春 19 女 chinese 70
12 杨春 19 女 math 70
12 杨春 19 女 english 70
13 李逵 25 男 chinese 60
13 李逵 25 男 math 60
13 李逵 25 男 english 70
13 林冲 20 男 chinese 50
13 林冲 20 男 math 60
13 林冲 20 男 english 50
13 王英 19 女 chinese 70
13 王英 19 女 math 80
13 王英 19 女 english 70

题目如下:

1. 读取文件的数据test.txt

2. 一共有多少个小于20岁的人参加考试?

3. 一共有多少个等于20岁的人参加考试?

4. 一共有多少个大于20岁的人参加考试?

5. 一共有多个男生参加考试?

6. 一共有多少个女生参加考试?

7. 12班有多少人参加考试?

8. 13班有多少人参加考试?

9. 语文科目的平均成绩是多少?

10. 数学科目的平均成绩是多少?

11. 英语科目的平均成绩是多少?

12. 每个人平均成绩是多少?

13. 12班平均成绩是多少?

14. 12班男生平均总成绩是多少?

15. 12班女生平均总成绩是多少?

16. 13班平均成绩是多少?

17. 13班男生平均总成绩是多少?

18. 13班女生平均总成绩是多少?

19. 全校语文成绩最高分是多少?

20. 12班语文成绩最低分是多少?

21. 13班数学最高成绩是多少?

22. 总成绩大于150分的12班的女生有几个?

23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?

答案在这里:

代码语言:javascript
复制
object test {

  def main(args: Array[String]): Unit = {

    val config = new SparkConf().setMaster("local[*]").setAppName("test")
    val sc = new SparkContext(config)

    // 1.读取文件的数据test.txt
    // 返回包含所有行数据的列表
    val data: RDD[String]  = sc.textFile("E:\\2020大数据新学年\\BigData\\05-Spark\\0403\\test.txt")

    //val value: RDD[Array[String]] = sc.makeRDD(List("12 宋江 25 男 chinese 50")).map(x=>x.split(" "))

    // 2. 一共有多少个小于20岁的人参加考试?2
    val count1: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt<20).groupBy(_(1)).count()


    // 3. 一共有多少个等于20岁的人参加考试?2
    val count2: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt==20).groupBy(_(1)).count()

    // 4. 一共有多少个大于20岁的人参加考试?2
    val count3: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt>20).groupBy(_(1)).count()

    // 5. 一共有多个男生参加考试?4
    val count4: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("男")).groupBy(_(1)).count()

    // 6.  一共有多少个女生参加考试?2
    val count5: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("女")).groupBy(_(1)).count()

    // 7.  12班有多少人参加考试?3
    val count6: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12")).groupBy(_(1)).count()

    // 8.  13班有多少人参加考试?3
    val count7: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13")).groupBy(_(1)).count()

    // 9.  语文科目的平均成绩是多少?58.333333333333336
    val mean1 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("chinese")).map(x=>x(5).toInt).mean()

    // 10.  数学科目的平均成绩是多少?63.333333333333336
    val mean2 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("math")).map(x=>x(5).toInt).mean()

    // 11. 英语科目的平均成绩是多少?63.333333333333336
    val mean3 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("english")).map(x=>x(5).toInt).mean()

    // 12. 每个人平均成绩是多少?
    //(王英,73)
    //(杨春,70)
    //(宋江,60)
    //(李逵,63)
    //(吴用,50)
    //(林冲,53)
    val every_socre: RDD[(String, Any)] = data.map(x=>x.split(" ")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum /t._2.size))

    // 13. 12班平均成绩是多少?60.0
    var mean5 = data.map(x => x.split(" ")).filter(x => x(0).equals("12")).map(x => x(5).toInt).mean()

    // 14. 12班男生平均总成绩是多少?165.0
    // (宋江,180)
    // (吴用,150)
    val boy12_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("男")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()

    // 15. 12班女生平均总成绩是多少?210.0
    // (杨春,210)
    val girl12_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()

    // 16. 13班平均成绩是多少?63.333333333333336
    var mean8 = data.map(x => x.split(" ")).filter(x => x(0).equals("13")).map(x => x(5).toInt).mean()

    // 17. 13班男生平均总成绩是多少?175.0
    //(李逵,190)
    //(林冲,160)
    val boy13_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13") && x(3).equals("男")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()

    // 18. 13班女生平均总成绩是多少?
    //(王英,220)
    val girl13_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()

    // 19. 全校语文成绩最高分是多少?70
    var max1 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese")).map(x => x(5).toInt).max()

    // 20. 12班语文成绩最低分是多少?50
    var max2 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese") && x(0).equals("12")).map(x => x(5).toInt).min()

    // 21. 13班数学最高成绩是多少?80
    var max3 = data.map(x => x.split(" ")).filter(x => x(4).equals("math") && x(0).equals("13")).map(x => x(5).toInt).max()

    // 22. 总成绩大于150分的12班的女生有几个?1
    //(杨春,210)
    val count12_gt150girl: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count()

    // 23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?
    //val countall: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt>=19 && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count()
    val complex1 = data.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3),line(5).toInt)})
    //(13,李逵,男 , 60)
    val complex2 = data.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(2)+","+line(3)+","+line(4),line(5).toInt)})
    //(12,宋江,男,chinese , 50)

    // 过滤出总分大于150的,并求出平均成绩    (13,李逵,男,(60,1))               (13,李逵,男,(190,3))             总成绩大于150                (13,李逵,男,63)
    val com1: RDD[(String, Int)] = complex1.map(x=>(x._1,(x._2,1))).reduceByKey((a, b)=>(a._1+b._1,a._2+b._2)).filter(a=>(a._2._1>150)).map(t=>(t._1,t._2._1/t._2._2))
    // 注意:reduceByKey 自定义的函数 是对同一个key值的value做聚合操作
    //(12,杨春,女 , 70)
    //(13,王英,女 , 73)
    //(12,宋江,男 , 60)
    //(13,林冲,男 , 53)
    //(13,李逵,男 , 63)

    //过滤出 数学大于等于70,且年龄大于等于19岁的学生                filter方法返回一个boolean值 【数学成绩大于70并且年龄>=19】                                       为了将最后的数据集与com1做一个join,这里需要对返回值构造成com1格式的数据
    val com2: RDD[(String, Int)] = complex2.filter(a=>{val line = a._1.split(",");line(4).equals("math") && a._2>=70 && line(2).toInt>=19}).map(a=>{val line2 = a._1.split(",");(line2(0)+","+line2(1)+","+line2(3),a._2.toInt)})
    //(12,杨春,女 , 70)
    //(13,王英,女 , 80)

    // val common: RDD[(String, (Int, Int))] = com1.join(com2)
    // common.foreach(println)
    // (12,杨春,女 , (70,70))
    // (13,王英,女 , (73,80))

    // 使用join函数聚合相同key组成的value元组
    // 再使用map函数格式化元素
    val result = com1.join(com2).map(a =>(a._1,a._2._1))
    //(12,杨春,女,70)
    //(13,王英,女,73)
    //到这里就大功告成了!!!!!!!!!!

  }
}

二、基础练习题

题目如下:

以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论:

题目如下:

1. 在kafak中创建rng_comment主题,设置2个分区2个副本

2. 数据预处理,把空行和缺失字段的行过滤掉

3. 请把给出的文件写入到kafka中,根据数据id进行分区,id为奇数的发送到一个分区中,偶数的发送到另一个分区

4. 使用Spark Streaming对接kafka

5. 使用Spark Streaming对接kafka之后进行计算

在mysql中创建一个数据库rng_comment

在数据库rng_comment创建vip_rank表,字段为数据的所有字段

在数据库rng_comment创建like_status表,字段为数据的所有字段

在数据库rng_comment创建count_conmment表,字段为 时间,条数

6. 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中

7. 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

8. 分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

答案在这里:

1. 创建Topic

在命令行窗口执行Kafka创建Topic的命令,并指定对应的分区数和副本数

代码语言:javascript
复制
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 2 --topic rng_comment 

2. 读取文件,并对数据做过滤并输出到新文件

代码语言:javascript
复制
object test01_filter {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()

    val sc: SparkContext = spark.sparkContext

    // 读取数据
    //testFile是多行数据
    val rddInfo: RDD[String] = sc.textFile("E:\\rng_comment.txt")

    // 对数据进行一个过滤
    val RNG_INFO: RDD[String] = rddInfo.filter(data => {

      // 判断长度:将每行的内容用tab键切割,判断最后的长度
      // 判读是否为空字符: trim之后不为empty
      data.split("\t").length == 11 && !data.trim.isEmpty

    })


//    // 如果想直接将数据写入到Kafka,而不通过输出文件的方式
//    val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
//
//    def saveToKafka(INFO:RDD[String]): Unit ={
//
//      try {
//
//        INFO.foreach(x=>{
//          val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("rng_test",x.split("\t")(0),x.toString)
//
//          kafkaProducer.send(record)
//        })
//
//      }catch {
//        case e:Exception => println("发送数据出错:"+e)
//      }
//
//    }

    // 导入隐式转换
    // 将RDD转换成DF
    import spark.implicits._
    val df: DataFrame = RNG_INFO.toDF()

    // 输出数据【默认分区数为2,这里我们指定分区数为1】
    df.repartition(1).write.text("E:\\outputtest")

    // 关闭资源
    sc.stop()
    spark.stop()

  }
}

3. 读取新文件,将数据按照题意发送到Kafka的不同分区

需要先写一个实现自定义分区逻辑的java类

代码语言:javascript
复制
/*
编写自定义分区逻辑
 */
public class ProducerPartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

   /*
   编写自定义分区代码
    */
        //System.out.println(value.toString());
        String[] str = value.toString().split("\t");

        // 由题意可得,id为奇数的发送到一个分区中,偶数的发送到另一个分区
        if (Integer.parseInt(str[0]) % 2 == 0){
            return 0;
        }else {
            return 1;
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

然后在下面的程序中引用分区类的类路径

代码语言:javascript
复制
public class test02_send {

    /*
   程序的入口
    */
    public static void main(String[] args) throws IOException {

        //编写生产数据的程序

        //1、配置kafka集群环境(设置)
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        // kafka   key 和value的序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 根据题意得,需要自定义分区
        props.put("partitioner.class", "com.czxy.scala.demo12_0415.han.ProducerPartition");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // 指定需要读取的文件
        File file = new File("E:\\outputtest\\part-00000-fe536dc7-523d-4fdd-b0b5-1a045b8cb1ab-c000.txt");

        // 创建对应的文件流,进行数据的读取
        FileInputStream fileInputStream = new FileInputStream(file);
        //   指定编码格式进行读取
        InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8");
        // 创建缓冲流
        BufferedReader bufferedReader = new BufferedReader(inputStreamReader);

        // 创建一个变量,用来保存每次读取的数据
        String tempString = null;

        // 循环遍历读取文件内容
        while ((tempString = bufferedReader.readLine()) != null) {

            // 利用kafka对象发送数据
            kafkaProducer.send(new ProducerRecord<>("rng_comment", tempString));

            // 发送完成之后打印数据
            System.out.println("已发送:" + tempString);
        }

        System.out.println("数据发送完毕!");

        // 关闭kafka数据生产者
        kafkaProducer.close();

    }
}

4. 先在数据库中创建好接收数据需要用到的表

代码语言:javascript
复制
create table vip_rank(  `index` varchar(100) null comment '数据id',  child_comment varchar(100) null comment '回复数量',  comment_time DATE null comment '评论时间',  content TEXT null comment '评论内容',  da_v varchar(100) null comment '微博个人认证',  like_status varchar(100) null comment '赞',  pic varchar(100) null comment '图片评论url',  user_id varchar(100) null comment '微博用户id',  user_name varchar(100) null comment '微博用户名',  vip_rank int null comment '微博会员等级',  stamp varchar(100) null comment '时间戳');create table like_status(  `index` varchar(100) null comment '数据id',  child_comment varchar(100) null comment '回复数量',  comment_time DATE null comment '评论时间',  content varchar(10000) null comment '评论内容',  da_v varchar(100) null comment '微博个人认证',  like_status varchar(100) null comment '赞',  pic varchar(100) null comment '图片评论url',  user_id varchar(100) null comment '微博用户id',  user_name varchar(100) null comment '微博用户名',  vip_rank int null comment '微博会员等级',  stamp varchar(100) null comment '时间戳');create table count_comment(  time DATE null comment '时间',  count int null comment '出现的次数',  constraint rng_comment_pk    primary key (time));

5. 使用Spark Streaming对接kafka之后进行计算

下面的代码完成了:

查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中

查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

代码语言:javascript
复制
object test03_calculate {


  /*
     将数据从kafka集群中读取,并将数据做进一步的处理过后,写入到mysql数据库中
   */
  def ConnectToMysql() ={

    // 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码
    DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8", "root", "root")
  }

  /**
    * 将数据写入到MySQL的方法
    * @param tableName 表名
    * @param data List类型的数据
    */
  def saveDataToMysql(tableName:String,data:List[String]): Unit ={

    // 获取连接
    val connection: Connection = ConnectToMysql()
    // 创建一个变量用来保存sql语句
    val sql = s"insert into ${tableName} (`index`, child_comment, comment_time, content, da_v,like_status,pic,user_id,user_name,vip_rank,stamp) values (?,?,?,?,?,?,?,?,?,?,?)"
    // 将数据存入到mysql中
    val ps: PreparedStatement = connection.prepareStatement(sql)
    ps.setString(1,data.head)
    ps.setString(2,data(1))
    ps.setString(3,data(2))
    ps.setString(4,data(3))
    ps.setString(5,data(4))
    ps.setString(6,data(5))
    ps.setString(7,data(6))
    ps.setString(8,data(7))
    ps.setString(9,data(8))
    ps.setString(10,data(9))
    ps.setString(11,data(10))

    // 提交[因为是插入数据,所以这里需要更新]
    ps.executeUpdate()
    // 关闭连接
    connection.close()

  }


  def main(args: Array[String]): Unit = {

    //1 创建sparkConf
    var conf = new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")
    //2 创建一个sparkcontext
    var sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //3 创建streamingcontext
    var ssc = new StreamingContext(sc,Seconds(3))

    //设置kafka对接参数
    var  kafkaParams= Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaDemo",
      //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
      "auto.offset.reset" -> "earliest",
      //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    // 设置检查点的位置
    ssc.checkpoint("sparkstreaming/")

    //kafkaDatas  含有key和value
    //key是kafka成产数据时指定的key(可能为空)
    //value是真实的数据(100%有数据)
    val kafkaDatas: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      //设置位置策略   均衡
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams))

    kafkaDatas.foreachRDD(rdd=>rdd.foreachPartition(line=>{

      // 遍历每一个分区的数据
      for (row <- line){

        // 获取到行数据组成的array数组
        val str: Array[String] = row.value().split("\t")

        // 将数据转成List集合
        val list: List[String] = str.toList

        /* 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 */
        if (list(9).equals("5")){
          // 调用方法,将集合数据写入到指定的表中
          saveDataToMysql("vip_rank",list)
        }

        /* 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中 */
        if (Integer.parseInt(list(5))>10){
          saveDataToMysql("like_status",list)
        }

      }

    }))


    //5 开启计算任务
    ssc.start()
    //6 等待关闭
    ssc.awaitTermination()
  }
  }

运行成功后的效果

vip_rank

like_status

下面的代码完成了: 分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

代码语言:javascript
复制
object test04_count {


  def ConnectToMysql() ={

    // 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码
    DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_test?characterEncoding=UTF-8", "root", "root")

  }

  /**
    * 将数据存入到mysql中
    *
    * @param time  时间
    * @param count 数量
    */
  def saveDataToMysql(time: String, count: Int): Unit = {
    println(s"$time\t $count")
    if (time.contains("2018/10/20") || time.contains("2018/10/21") || time.contains("2018/10/22") || time.contains("2018/10/23")) {
      //获取连接
      val connection: Connection = ConnectToMysql()
      //创建一个变量用来保存sql语句
      val sql: String = "INSERT INTO count_comment (time,count) VALUES (?,?) ON DUPLICATE KEY UPDATE count = ?"
      //将一条数据存入到mysql
      val ps: PreparedStatement = connection.prepareStatement(sql)
      ps.setString(1, time)
      ps.setInt(2, count)
      ps.setInt(3, count)

      //提交
      ps.executeUpdate()
      //关闭连接
      connection.close()
    }
  }


  def main(args: Array[String]): Unit = {


    //1 创建sparkConf
    var conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")
    //2 创建一个sparkcontext
    var sc: SparkContext =new SparkContext(conf)
    sc.setLogLevel("WARN")
    //3 创建StreamingContext
    var ssc: StreamingContext =new   StreamingContext(sc,Seconds(5))
    //设置缓存数据的位置
    ssc.checkpoint("./TmpCount")

    // 设置kafka的参数
    var  kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",  // 集群位置
      "key.deserializer" -> classOf[StringDeserializer],  // key序列化标准
      "value.deserializer" -> classOf[StringDeserializer],  // value序列化标准
      "group.id" -> "SparkKafkaDemo",  // 分组id
      //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
      "auto.offset.reset" -> "earliest",
      //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

      // 接收Kafka的数据并根据业务逻辑进行计算
      val kafkaDatas: InputDStream[ConsumerRecord[String, String]] =
        KafkaUtils.createDirectStream[String,String](
          ssc,   // StreamingContext对象
          LocationStrategies.PreferConsistent,  // 位置策略
          ConsumerStrategies.Subscribe[String,String](Array("rng_comment"),kafkaParams)  // 设置需要消费的topic和kafka参数

        )

    // 2018/10/23 16:09  需要先获取到下标为2的数据,再按照空格进行切分,获取到年月日即可
    val kafkaWordOne: DStream[(String, Int)] = kafkaDatas.map(z=>z.value().split("\t")(2).split(" ")(0)).map((_,1))

    // 更新数据
    val wordCounts: DStream[(String, Int)] = kafkaWordOne.updateStateByKey(updateFunc)

    // 遍历RDD
    wordCounts.foreachRDD(rdd=>rdd.foreachPartition(line=>{

      for(row <- line){

        saveDataToMysql(row._1,row._2)
        //println("保存成功!")
      }

    }))

    println("完毕!")

    // 开启计算任务
    ssc.start()

    // 等待关闭
    ssc.awaitTermination()

  }

  //currentValues:当前批次的value值,如:1,1,1 (以测试数据中的hadoop为例)
  //historyValue:之前累计的历史值,第一次没有值是0,第二次是3
  //目标是把当前数据+历史数据返回作为新的结果(下次的历史数据)
  def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
    // currentValues当前值
    // historyValue历史值
    val result: Int = currentValues.sum + historyValue.getOrElse(0)

    Some(result)

  }
}

运行成功后的效果

count_comment

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 创建Topic
  • 2. 读取文件,并对数据做过滤并输出到新文件
  • 3. 读取新文件,将数据按照题意发送到Kafka的不同分区
  • 4. 先在数据库中创建好接收数据需要用到的表
  • 5. 使用Spark Streaming对接kafka之后进行计算
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档