首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(十三):Spark Core的RDD创建

RDD的创建

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集

并行化集合

由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。

演示范例代码,从List列表构建RDD集合:

代码语言:javascript
复制
package cn.itcast.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD
 *  - 将Scala集合转换为RDD
 *      sc.parallelize(seq)
 *  - 将RDD转换为Scala中集合
 *      rdd.collect()
 *      rdd.collectAsMap()
 */
object SparkParallelizeTest {
    def main(args: Array[String]): Unit = {
        // 创建应用程序入口SparkContext实例对象
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[*]")
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")
        
        // 1、Scala中集合Seq序列存储数据
        val linesSeq: Seq[String] = Seq(
            "hello me you her",
            "hello you her",
            "hello her",
            "hello"
        )
        
        // 2、并行化集合创建RDD数据集
        /*
          def parallelize[T: ClassTag](
              seq: Seq[T],
              numSlices: Int = defaultParallelism
          ): RDD[T]
         */
        val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
        //val inputRDD: RDD[String] = sc.makeRDD(linesSeq, numSlices = 2)
        
        // 3、调用集合RDD中函数处理分析数据
        val resultRDD: RDD[(String, Int)] = inputRDD
            .flatMap(_.split("\\s+"))
            .map((_, 1))
            .reduceByKey(_ + _)
        
        // 4、保存结果RDD到外部存储系统(HDFS、MySQL、HBase。。。。)
        resultRDD.foreach(println)
        
        // 应用程序运行结束,关闭资源
        sc.stop()
    }
}

外部存储系统

由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop支持的数据集,比如 HDFS、Cassandra、HBase 等。实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。

范例演示:从文件系统读取数据,设置分区数目为2,代码如下。

代码语言:javascript
复制
package cn.itcast.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 从HDFS/LocalFS文件系统加载文件数据,封装为RDD集合, 可以设置分区数目
 *  - 从文件系统加载
 *      sc.textFile("")
 *  - 保存文件系统
 *      rdd.saveAsTextFile("")
 */
object SparkFileSystemTest {
    def main(args: Array[String]): Unit = {
        // 创建应用程序入口SparkContext实例对象
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[*]")
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")
        
        // 1、从文件系统加载数据,创建RDD数据集
        /*
          def textFile(
              path: String,
              minPartitions: Int = defaultMinPartitions
          ): RDD[String]
         */
        val inputRDD: RDD[String] = sc.textFile("data/input/words.txt",2)
        println(s"Partitions Number : ${inputRDD.getNumPartitions}")
        
        // 2、调用集合RDD中函数处理分析数据
        val resultRDD: RDD[(String, Int)] = inputRDD
            .flatMap(_.split("\\s+"))
            .map((_, 1))
            .reduceByKey(_ + _)
        
        // 3、保存结果RDD到外部存储系统(HDFS、MySQL、HBase。。。。)
        resultRDD.foreach(println)
        
        // 应用程序运行结束,关闭资源
        sc.stop()
    }
    
}

其中文件路径:可以指定文件名称,可以指定文件目录,可以使用通配符指定。

小文件读取

     在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:wholeTextFiles类,专门读取小文件数据。

范例演示:读取10个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。

代码语言:javascript
复制
package cn.itcast.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 采用SparkContext#wholeTextFiles()方法读取小文件
 */
object SparkWholeTextFileTest {
    def main(args: Array[String]): Unit = {
        // 创建应用程序入口SparkContext实例对象
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[*]")
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")
        
        // wholeTextFiles()
        val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10", minPartitions = 2)
        filesRDD.map(_._1).foreach(println)
        val inputRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\n"))
        println(s"Partitions Number = ${inputRDD.getNumPartitions}")
        println(s"Count = ${inputRDD.count()}")
        
        // 应用程序运行结束,关闭资源
        sc.stop()
    }
}

实际项目中,可以先使用wholeTextFiles方法读取数据,设置适当RDD分区,再将数据保存到文件系统,以便后续应用读取处理,大大提升性能。

下一篇
举报
领券