前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark入门_2_LoadSaveData

Spark入门_2_LoadSaveData

作者头像
用户1147754
发布2018-01-02 17:24:55
8570
发布2018-01-02 17:24:55
举报
文章被收录于专栏:YoungGyYoungGy
  • motivation
  • file format
    • text files
    • json
    • csv tsv
    • sequence files
    • object files
    • hadoop input and output values
    • file compression
  • file system
    • localregular FS
    • amazon S3
    • hdfs
  • structured data with Spark SQL
    • hive
    • json
  • databases
    • java database connectivity
    • cassandra
    • hbase
    • elasticsearch
  • conclusion

motivation

  1. File formats and filesystems: 存储在NFS、HDFS上面的text、json、sequential file等。
  2. Structured data sources through Spark SQL:提供结构化数据的API,比如JSON和HIVE。
  3. Databases and key-value stores: 将会用内建和第三方的库去连接Cassandra, HBase, Elasticsearch, and JDBC databases.

file format

这里写图片描述
这里写图片描述
代码语言:javascript
复制
hdfs://namenodehost/parent/child
hdfs://parent/child
file://parent/child
sc.textFile("hdfs://host:port_no/data/searches")

text files

代码语言:javascript
复制
#读单个数据
input = sc.textFile("file:///home/holden/repos/spark/README.md")
input = sc.textFile("README.md")
input3 = sc.textFile("hdfs://Master:50070/test/sample.txt")
#主机名和端口号在hadoop的core-site.xml中查看

#读目录数据
input = sc.wholeTextFile("file:///home/holden/repos/spark/")

#写数据
result.saveAsTextFile(outputFile)
这里写图片描述
这里写图片描述

json

代码语言:javascript
复制
import json
data = input.map(lambda x: json.loads(x))

(data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x))
.saveAsTextFile(outputFile))

csv tsv

代码语言:javascript
复制
import csv
import StringIO
...
def loadRecord(line):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

def loadRecords(fileNameContents):
    """Load all the records in a given file"""
    input = StringIO.StringIO(fileNameContents[1])
    reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
    return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

def writeRecords(records):
    """Write out CSV lines"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

sequence files

object files

hadoop input and output values

file compression

file system

local/regular FS

需要注意的是,访问本地的文件地址必须确保路径以及文件在所有节点下面都是存在的。 如果条件不满足,可以先在drive上访问文件,然后利用parallelize将文件分发到worker上。

但是,分发到worker的过程是很慢的,所以我们推荐将你的文件放在shared filesystem,比如HDFS, NFS或者S3中。

代码语言:javascript
复制
val rdd = sc.textFile("file:///home/holden/happypandas.gz")

amazon S3

hdfs

代码语言:javascript
复制
 hdfs://master:port/path

structured data with Spark SQL

这里写图片描述
这里写图片描述

hive

json

databases

java database connectivity

cassandra

hbase

elasticsearch

conclusion

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • motivation
  • file format
    • text files
      • json
        • csv tsv
          • sequence files
            • object files
              • hadoop input and output values
                • file compression
                • file system
                  • local/regular FS
                    • amazon S3
                      • hdfs
                      • structured data with Spark SQL
                        • hive
                          • json
                          • databases
                            • java database connectivity
                              • cassandra
                                • hbase
                                  • elasticsearch
                                  • conclusion
                                  相关产品与服务
                                  Elasticsearch Service
                                  腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档