前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据读取与保存

数据读取与保存

作者头像
用户1483438
发布2022-05-09 15:41:00
1K0
发布2022-05-09 15:41:00
举报
文章被收录于专栏:大数据共享

摘要

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:

  • Text文件
  • Json文件
  • Csv文件
  • Sequence文件
  • Object文件;

文件系统分为:

  • 本地文件系统
  • HDFS以及数据库。

文件类数据读取与保存

Text文件 基本语法:

  • 数据读取:textFile(String)
  • 数据保存:saveAsTextFile(String)

案例演示:经典的worldCount程序,并将程序计算结果写入到本地文件中

代码语言:javascript
复制
  @Test
  def textTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsTextFile("file:///C:/Users/123456/Desktop/worldCount_0001")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

worldCount_0001 是一个目录,并且不能存在

程序结果
程序结果

就像跑了一个MR,将数据按照分区存入不同的目录中。

Sequence文件 SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。

案例演示: 保存Sequence文件

代码语言:javascript
复制
 @Test
  def sequenceWriteTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsSequenceFile("file:///C:/Users/123456/Desktop/worldCount_0003")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

读取Sequence文件

代码语言:javascript
复制
  @Test
  def sequenceReadTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[(String, Int)] =sc.sequenceFile[String,Int](path = "file:///C:/Users/123456/Desktop/worldCount_0003",minPartitions = 4)
    //打印
    rdd1.foreach(e=>{
      println(e)
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

打印结果

代码语言:javascript
复制
(python,1)
(shell,4)
(wahaha,1)
(java,5)
(hello,2)

注意: sc.sequenceFile[String,Int] 需要指定返回参数类型 。

Object对象文件 对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

案例演示 将数据保存成Object文件

代码语言:javascript
复制
  @Test
  def ObjectWriteTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsObjectFile("file:///C:/Users/123456/Desktop/worldCount_0002")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

读取 Object 文件

代码语言:javascript
复制
  @Test
  def ObjectReadTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取对象文件
    // sc.objectFile[(String,Int)] 需要指定数据类型,写入进去的是一个元组,读取的时候应该也元组的形式返回
    val rdd1=sc.objectFile[(String,Int)](path = "file:///C:/Users/123456/Desktop/worldCount_0002",minPartitions = 4)

    //打印
    rdd1.foreach(e=>{
      println(e)
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

结果

代码语言:javascript
复制
(python,1)
(wahaha,1)
(shell,4)
(hello,2)
(java,5)

注意: sc.objectFile[(String,Int)] 必须指定数据类型

文件系统类数据读取与保存 Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。如TextInputFormat,新旧两个版本所引用分别是org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 摘要
  • 文件类数据读取与保存
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档