Spark如何读取一些大数据集到本地机器上

最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:

这种写法的代码一般如下:

上面的这种写法,基本原理就是一次性把所有分区的数据,全部读取到driver节点上,然后开始做处理,所以数据量大的时候,经常会出现内存溢出情况。

(问题一)如何避免这种情况?

分而治之,每次只拉取一个分区的数据到驱动节点上,处理完之后,再处理下一个分数据的数据。

(问题二)如果单个分区的数据已经大到内存装不下怎么办?

给数据集增加更多的分区,让大分区变成多个小分区。

(问题三)如果结果集数据大于内存的大小怎么办?

要么增加驱动节点的内存,要么给每个分区的数据都持久化本地文件上,不再内存中维护

下面来看下关键问题,如何修改spark的rdd分区数量?

我们知道在spark里面RDD是数据源的抽象模型,RDD里面实际上是把一份大数据源切分成了多个分区数据,然后来并行处理这份大数据集。

默认情况下如果Spark从HDFS上加载数据,默认分区个数是按照HDFS的block size来切分的,当然我们在加载的时候可以指定的分区个数。

如果在加载时不指定分区个数,spark里面还提供了两个函数来进行重分区:

接着我们来看下coalesce函数和repartition函数的区别:

通过查看源码得知repartition函数内部实际上是调用了coalesce函数第二个参数等于true时的封装。所以我们重点来关注下coalesce函数即可:

coalesce的第一个参数是修改后的分区个数

coalesce的第二个参数是控制是否需要shuffle

举一个例子:

当前我们RDD的分区个数是100:

(1)如果要变成10,应该使用

(2)如果要变成300,应该使用

(3)如果要变成1,应该使用

这里解释一下:

分区数从多变少,一般是不需要开启shuffle的,这样性能最高,因为不需要跨网络混洗数据,当然你也可以开启shuffle在特定场景下,如分区数据极其不均衡。但建议一般不要使用。

分区数从少变多,必须开启shuffle,如果不开启那么分区数据是不会改变的,由少变多必须得重新混洗数据才能变多,这里需要注意一点,如果数据量特别少,那么会有一些分区的数据是空。

最后的例子是一种极端场景,如果从多变成1,不开启shuffle,那么可能就个别节点计算压力特别大,集群资源不能充分利用,所以有必要开启shuffle,加速合并计算的流程。

明白了如何改变rdd的分区个数之后,我们就可以文章开头遇到的问题结合起来,拉取大量数据到驱动节点上,如果整体数据集太大,我们就可以增加分区个数,循环拉取,但这里面需要根据具体的场景来设置分区个数,因为分区个数越多,在spark里面生成的task数目就越多,task数目太多也会影响实际的拉取效率,在本案例中,从hdfs上读取的数据默认是144个分区,大约1G多点数据,没有修改分区个数的情况下处理时间大约10分钟,在调整分区个数为10的情况下,拉取时间大约在1-2分钟之间,所以要根据实际情况进行调整。

文章开始前的代码优化后的如下:

最后在看下,spark任务的提交命令:

这里面主要关注参数:

单次拉取数据结果集的最大字节数,以及驱动节点的内存,如果在进行大结果集下拉时,需要特别注意下这两个参数的设置。

参考文档:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD

https://spark.apache.org/docs/latest/configuration.html

https://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machin

原文发布于微信公众号 - 我是攻城师(woshigcs)

原文发表时间:2018-01-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java后端技术栈

2018整理最全的50道Redis面试题!

Redis本质上是一个Key-Value类型的内存数据库,很像memcached,整个数据库统统加载在内存当中进行操作,定期通过异步操作把数据库数据flush到...

34300
来自专栏沈唁志

ThinkPHP-PHP开发中的主流框架

89140
来自专栏Pythonista

saltstack高效运维

saltstack是由thomas Hatch于2011年创建的一个开源项目,设计初衷是为了实现一个快速的远程执行系统。

25650
来自专栏李蔚蓬的专栏

Android Device Monitor--File Explorer 中的/data/data/..无法访问的问题

最近在看《第一行代码》(第二版)中关于数据存储方案的介绍。数据的状态分为两种:瞬时状态和持久状态,一般保存在内存中的数据随着活动的关闭,数据也就销毁了,如果我们...

52020
来自专栏xingoo, 一个梦想做发明家的程序员

Volatile的作用

众所周知,volatile关键字可以让线程的修改立刻通知其他的线程,从而达到数据一致的作用。那么它具体涉及到哪些内容呢? 关于缓存 计算机最大的存储空间就...

22380
来自专栏北京马哥教育

操作系统能否支持百万连接?

下面来分别对这几个问题进行分析. 1. 操作系统能否支持百万连接? 对于绝大部分 Linux 操作系统, 默认情况下确实不支持 C1000K! 因为操作系统包含...

37950
来自专栏代码世界

Python之IO模型

IO模型介绍 为了更好地了解IO模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞     同步(synchronous) IO和异步(asynchronous...

449110
来自专栏黑白安全

web应用渗透测试流程

在信息收集阶段,我们需要尽量多的收集关于目标web应用的各种信息,比如:脚本语言的类型、服务器的类型、目录的结构、使用的开源软件、数据库类型、所有链接页面,用到...

20430
来自专栏salesforce零基础学习

salesforce零基础学习(八十五)streaming api 简单使用(接近实时获取你需要跟踪的数据的更新消息状态)

Streaming API参考链接: https://trailhead.salesforce.com/en/modules/api_basics/units/...

34280
来自专栏文渊之博

预写式日志(Write-Ahead Logging (WAL))

     SQL Server中使用了WAL(Write-Ahead Logging)技术来保证事务日志的ACID特性。而且大大减少了IO操作。      WA...

32380

扫码关注云+社区

领取腾讯云代金券