Spark入门_2_LoadSaveData

motivation

  1. File formats and filesystems: 存储在NFS、HDFS上面的text、json、sequential file等。
  2. Structured data sources through Spark SQL:提供结构化数据的API,比如JSON和HIVE。
  3. Databases and key-value stores: 将会用内建和第三方的库去连接Cassandra, HBase, Elasticsearch, and JDBC databases.

file format

hdfs://namenodehost/parent/child
hdfs://parent/child
file://parent/child
sc.textFile("hdfs://host:port_no/data/searches")

text files

#读单个数据
input = sc.textFile("file:///home/holden/repos/spark/README.md")
input = sc.textFile("README.md")
input3 = sc.textFile("hdfs://Master:50070/test/sample.txt")
#主机名和端口号在hadoop的core-site.xml中查看

#读目录数据
input = sc.wholeTextFile("file:///home/holden/repos/spark/")

#写数据
result.saveAsTextFile(outputFile)

json

import json
data = input.map(lambda x: json.loads(x))

(data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x))
.saveAsTextFile(outputFile))

csv tsv

import csv
import StringIO
...
def loadRecord(line):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

def loadRecords(fileNameContents):
    """Load all the records in a given file"""
    input = StringIO.StringIO(fileNameContents[1])
    reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
    return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

def writeRecords(records):
    """Write out CSV lines"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

sequence files

object files

hadoop input and output values

file compression

file system

local/regular FS

需要注意的是,访问本地的文件地址必须确保路径以及文件在所有节点下面都是存在的。 如果条件不满足,可以先在drive上访问文件,然后利用parallelize将文件分发到worker上。

但是,分发到worker的过程是很慢的,所以我们推荐将你的文件放在shared filesystem,比如HDFS, NFS或者S3中。

val rdd = sc.textFile("file:///home/holden/happypandas.gz")

amazon S3

hdfs

 hdfs://master:port/path

structured data with Spark SQL

hive

json

databases

java database connectivity

cassandra

hbase

elasticsearch

conclusion

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏FreeBuf

收集各类安全设备、Nginx日志实现日志统一管理及告警

近来安全测试项目较少,想着把安全设备、nginx日志收集起来并告警, 话不多说,直接说重点,搭建背景:

1247
来自专栏陈树义

Dubbo配置方式详解

Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是阿里巴巴 SOA 服务化治理方案的核心框架,每天为 2,000+ 个服...

3676
来自专栏乐沙弥的世界

Linux/Unix shell 脚本监控磁盘可用空间

    Linux下监控磁盘的空闲空间的shell脚本,对于系统管理员或DBA来说,必不可少。下面是给出的一个监控磁盘空间空间shell脚本的样本,供大家参考。

652
来自专栏Kotlin入门系列

win7基础 cmd 查看当前已经启动的服务列表

1816
来自专栏GreenLeaves

Oracle 客户端安装

Oracle 客户端的安装方式一种有两种: 1、Oracle标准客户端   点击下载 ? 这是Oracle提供的标准版11r2的客户端 2、Oracle Dat...

2758
来自专栏菩提树下的杨过

分布式服务框架 dubbo/dubbox 入门示例

dubbo是一个分布式的服务架构,可直接用于生产环境作为SOA服务框架。 官网首页:http://dubbo.io/ ,官方用户指南 http://dubbo....

39410
来自专栏Netkiller

怎样制作RPM包

怎样制作RPM包 摘要 我在网上找RPM包的制作例子几乎都是C源码编译安装然后生成RPM包, 而我的程序不是C写的很多时候是脚本语言如Python, PHP 甚...

3346
来自专栏猿天地

spring boot集成druid连接池

源码下载:http://cxytiandi.com/code/detail/13 Druid是一个JDBC组件,它包括三部分: DruidDriver 代理Dr...

32510
来自专栏乐沙弥的世界

Oracle cloud control 12c 如何修改sysman密码

    前阵子在虚拟机部署了Oracle Cloud Control 12c,事别几日,竟然忘记了登陆密码。主要是因为现在的Oracle有关的Software...

601
来自专栏Oracle

Oracle内存数据库使用

t.inmemory, t.inmemory_priority, t.inmemory_distribute, t.inmemory_compression, ...

982

扫码关注云+社区