Spark常用函数(源码阅读六)

  源码层面整理下我们常用的操作RDD数据处理与分析的函数,从而能更好的应用于工作中。

      连接Hbase,读取hbase的过程,首先代码如下:

def tableInitByTime(sc : SparkContext,tableName : String,columns : String,fromdate: Date,todate : Date) : RDD[(ImmutableBytesWritable,Result)] = {
      val configuration = HBaseConfiguration.create()
      configuration.addResource("hbase-site.xml ")
      configuration.set(TableInputFormat.INPUT_TABLE,tableName )
      val scan = new Scan
      //scan.setTimeRange(fromdate.getTime,todate.getTime)
      val column = columns.split(",")
      for(columnName <- column){
        scan.addColumn("f1".getBytes(),columnName.getBytes())
      }
      val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
      System.out.println(hbaseRDD.count())
      hbaseRDD
  }

  我们来一点一点解析整个过程。

  1、val configuration = HBaseConfiguration.create()

  这个用过hbase的伙伴们都知道,加载配置文件,其实调用的是HBase的API,返回的RDD是个Configuration。加载的配置文件信息包含core-default.xml,core-site.xml,mapred-default.xml等。加载源码如下:

  2、随之设置表名信息,并声明scan对象,并且set读取的列有哪些,随后调用newAPIHadoopRDD,加载指定hbase的数据,当然你可以加上各种filter。那么下来 我们看看newAPIHadoopRDD是干了什么呢?我们来阅读下里面的实现。

  可以看到我们调用API,其实就是一个input过程,创建了一个newHadoopRDD对象,那么后台是一个input数据随后转化为RDD的过程。节点之间的数据传输是通过序列化数据,通过broadCast传输的conf信息。

  3、随之进行count验证操作,查找数据的partition个数,hbase的数据当然是以block块的形式存储于HDFS。

  4、下来开始map遍历,取出之前我们设置的字段,存入新的transRDD中,那么这个map函数干了什么呢?它其实是将原RDD所做的操作组织成一个function,创建一个MapPartitionsRDD。

  5、下来我们看下filter函数干了什么呢?

 val calculateRDD = transRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null)
      //map转换为字段((身份证号,经度(保留两位小数),纬度(保留两位小数),电话号码,时间段标志),1),最后的1代表出现一次,用于后边做累加
      .map(data => {
      val locsp = data._2.split(",").take(2)
      val df   = new DecimalFormat("######0.000")
      val hour = data._4.split(":")(0).toInt
      val datarange = if(hour >= 9 && hour <= 18) 1 else 0
      ((data._1,df.format(locsp(0).toDouble),df.format(locsp(1).toDouble),data._3,datarange),1)
    })

   这里的filter是进行为空判断,我们从源码中可以看到传入的是一个布尔类型的变量,与map相同通过MapPartitionsRDD进行function的条件过滤,那么也就是说,其实我们可以在map中直接提取我们需要的数据,或者用filter进行为空过滤,条件过滤。

  6、随后我们要进行相同key值的合并,那么,我们开始使用reduceByKey:

      //按key做reduce,value做累加
      .reduceByKey(_ + _)

  底层调用了combineByKeyWithClassTag,这里的Partitioner参数我们之所以没有传入,是因为在map的RDD中已包含该RDD的partitioner的信息。它内部的实现将map的结果调用了require先进行merge,随后创建shuffleRDD.shuffleRDD就是最终reduce后的RDD。然后看不懂了。。。因为需要与整个流程相结合。所以后续继续深入~

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏jeremy的技术点滴

Java开发小技巧_02

45140
来自专栏Spark生态圈

[spark] Shuffle Write解析 (Sort Based Shuffle)

从 Spark 2.0 开始移除了Hash Based Shuffle,想要了解可参考Shuffle 过程,本文将讲解 Sort Based Shuffle。

16720
来自专栏desperate633

设计模式之单件模式(Singleton Pattern)引出单例模式经典单例模式的实现定义单件模式经典单件模式存在的问题解决单例模式的多线程问题

单件模式,也叫单例模式,可以说是设计模式中最简单的一种。顾名思义,就是创造独一无二的唯一的一个实例化的对象。

12430
来自专栏扎心了老铁

java使用spark/spark-sql处理schema数据

1、spark是什么? Spark是基于内存计算的大数据并行计算框架。 1.1 Spark基于内存计算 相比于MapReduce基于IO计算,提高了在大数据环境...

37350
来自专栏岑玉海

Spark源码系列(九)Spark SQL初体验之解析过程详解

好久没更新博客了,之前学了一些R语言和机器学习的内容,做了一些笔记,之后也会放到博客上面来给大家共享。一个月前就打算更新Spark Sql的内容了,因为一些别的...

53250
来自专栏木木玲

ConcurrentHashMap (JDK7) 详解

50590
来自专栏牛肉圆粉不加葱

[Spark源码剖析] DAGScheduler划分stage划分stage源码剖析

在DAGScheduler内部通过post一个JobSubmitted事件来触发Job的提交

10730
来自专栏牛肉圆粉不加葱

[Spark源码剖析] DAGScheduler提交stage

DAGScheduler通过调用submitStage来提交stage,实现如下:

8320
来自专栏Spark生态圈

[spark] Task执行流程

在文章TaskScheduler 任务提交与调度源码解析 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourc...

21610
来自专栏互联网开发者交流社区

使用Java实现面向对象编程

13920

扫码关注云+社区

领取腾讯云代金券