Spark读取压缩文件

前言

本文讲如何用spark读取gz类型的压缩文件,以及如何解决我遇到的各种问题。

1、文件压缩

下面这一部分摘自Spark快速大数据分析:   在大数据工作中,我们经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数Hadoop输出格式来说,我们可以指定一种压缩编解码器来压缩数据。   选择一个输出压缩编解码器可能会对这些数据以后的用户产生巨大影响。对于像Spark 这样的分布式系统,我们通常会尝试从多个不同机器上一起读入数据。要实现这种情况,每个工作节点都必须能够找到一条新记录的开端。有些压缩格式会使这变得不可能,而必须要单个节点来读入所有数据,这就很容易产生性能瓶颈。可以很容易地从多个节点上并行读取的格式被称为“可分割”的格式。下表列出了可用的压缩选项。

格式

可分割

平均压缩速度

文本文件压缩效率

Hadoop压缩编解码器

纯Java实现

原生

备注

gzip

org.apache.hadoop.io.compress.GzipCodec

lzo

是(取决于所使用的库)

非常快

中等

com.hadoop.compression.lzo.LzoCodec

需要在每个节点上安装LZO

bzip2

非常高

org.apache.hadoop.io.compress.Bzip2Codec

为可分割版本使用纯Java

zlib

中等

org.apache.hadoop.io.compress.DefaultCodec

Hadoop 的默认压缩编解码器

Snappy

非常快

org.apache.hadoop.io.compress.SnappyCodec

Snappy 有纯Java的移植版,但是在Spark/Hadoop中不能用

  尽管Spark 的textFile() 方法可以处理压缩过的输入,但即使输入数据被以可分割读取的方式压缩,Spark 也不会打开splittable。因此,如果你要读取单个压缩过的输入,最好不要考虑使用Spark 的封装,而是使用newAPIHadoopFile 或者hadoopFile,并指定正确的压缩编解码器。

2、代码

代码很简单,用textFile()即可,假设,我的数据名为data.txt.gz,我把它放在hdfs上的/tmp/dkl路径下那么代码为:

12

val path = "hdfs://ambari.master.com:8020/tmp/dkl/data.txt.gz"val data = sc.textFile(path)

注:把数据放在hdfs的命令为

1

hadoop fs -put data.tar.gz /tml/dkl

3、一些小问题

3.1 数据

首先造几个数据吧,先创建一个txt,名字为data.txt,内容如下

12345

1 张三 上海 2018-05-252 张三 上海 2018-05-253 张三 上海 2018-05-254 张三 上海 2018-05-255 张三 上海 2018-05-25

3.2 如何压缩

那么如如何打包为gz格式的压缩文件呢,分两种 一、 在windows上打包,如果不想在Linux服务器上用命令打包,那么可以直接用windows上的软件打包(win上常见的zip,rar格式,spark是不支持的),我用7-zip软件压缩,大家可百度7-zip或直接在https://www.7-zip.org/下载安装,压缩格式选gzip即可。 二、 在Linux上压缩,可通过下面的命令 1、保留原文件

1

gzip –c data.txt > data.txt.gz

2、不保留原文件,默认生成的文件名为原文件名.gz,即data.txt.gz

1

gzip data.txt

压缩完了之后,跑一下程序测试一下

1

data.take(3).foreach(println)

123

1 张三 上海 2018-05-252 张三 上海 2018-05-253 张三 上海 2018-05-25

根据结果看没问题。 三、 说明 在Linux上用tar命令压缩,spark虽然可以读,但是第一行会有文件信息

1

tar -zcvf data.tar.gz data.txt

3.3 文件编码问题

别人给我的原文件是.rar,那我需要将其解压之后得到txt,然后按照上述方式压缩为.gz,然后上传到hdfs,进行代码测试,打印前几条发现乱码,查了一下发现原文件是gbk编码的,且sc.textFile()不能指定编码,只能读取utf8格式,其他格式就会乱码。

注意:因为实际情况下解压后的txt文件很大,windows是直接打不开的,所以不能通过打开文件修改编码的方法去解决。

3.3.1 构建测试gbk格式的文件

1、windows上可以用记事本打开,另存为,编码选择ANSI即可

2、Linux可以通过下面的命令修改

1

iconv -f utf8 -t gbk data.txt > data_gbk.txt

测试一下输出,发现确实乱码了(直接测试txt即可)

123

1 ���� �Ϻ� 2018-05-252 ���� �Ϻ� 2018-05-253 ���� �Ϻ� 2018-05-25

3.3.2 代码解决

通过如下代码测试即可 定义方法

123456789

import org.apache.spark.rdd.RDDimport org.apache.spark.SparkContextimport org.apache.hadoop.io.LongWritableimport org.apache.hadoop.mapred.TextInputFormatimport org.apache.hadoop.io.Textdef transfer(sc: SparkContext, path: String): RDD[String] = { sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1) .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))}

测试方法

1

transfer(sc, path3).take(3).foreach(println)

参考:Spark Scala 读取GBK文件的方法

3.3.3 Linux命令

可直接通过Linux命令转换txt的编码格式,再压缩,这样代码就不用修改 其实在3.2.1中已经涉及到了 1、通过Linux自带的命令iconv iconv不能覆盖原来的文件,只能生成新的文件之后,再通过mv命令去覆盖

1

iconv -f gbk -t utf8 data_gbk.txt > data_new.txt

2、通过enca enca可以直接覆盖原来的文件,这样如果不想改变来的文件名,就少一步mv操作了,enca不是子系统自带的,需要自己下载安装,可在http://dl.cihar.com/enca/下载最新版本。

12345678

#下载&解压wget http://dl.cihar.com/enca/enca-1.19.tar.gztar -zxvf enca-1.19.tar.gzcd enca-1.19#编译安装./configuremakemake install

安装好了之后通过下面的命令转换即可

1

enca -L zh_CN -x UTF-8 data_gbk.txt

转换编码格式之后,在通过程序测试即可。

参考:linux 下的文件编码格式转换

3.4 rdd换df

由于文件过大,不能直接打开看也没用垃圾数据,造成格式问题,如果有垃圾数据,在rdd转df的过程中会产生异常,这里记录一下我碰见的问题。

1、首先可以先打印出前几行数据查看一下该文件的大体格式

2、碰到的一个一个异常 代码用的旧版spark(1.6版本) 将rdd动态转为dataframe里面的方法。

1

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true])....

原因是因为文件里有一行数据为垃圾数据,这行数据的列数和列名的个数不一样导致的,可以在代码中过滤掉这样数据即可。

1

.filter(_.length == colName.length)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏信安之路

HCTF2017的三个WriteUp

解决方法就是先 undefine 掉函数,再右键选择 Code,最后 Create function 就可以正常反编译了。

1170
来自专栏Fish

利用IDEA查看和修改spark源码

经过了两天的摸索,算是初步学会了如何查看和修改spark源码。 大坑 对,这个要写在最前面,那就是注意你的scalaSDK版本!!!!不同的Spark版本支持的...

4229
来自专栏腾讯云API

腾讯云API:无服务器函数

无服务器函数是一个很好玩的东西,可以通过这个程序跑一些脚本,在一定程度上,是很方便的。但是作为新鲜事物,一般很难被大家接受,所以,我今天在这里,就做一个小例子,...

9185
来自专栏Linux驱动

第1阶段——uboot分析之启动函数bootm命令 (9)

本节主要学习: 详细分析UBOOT中"bootcmd=nand read.jffs2 0x30007FC0 kernel;bootm 0x30007FC0" 中...

2799
来自专栏大内老A

通过WCF扩展实现消息压缩

对于需要进行大规模数据传输的WCF应用来说,对于请求消息和回复消息进行传输前的压缩,不但可以降低网络流量,也可以提高网络传输的性能。由于WCF的扩展性,我们可以...

19910
来自专栏知识分享

1-STM32嵌入LUA开发(控制小灯闪耀)

今天因为想让STM32完美的处理字符串,所以就想着让STM32嵌入lua,本来想用f103c8t6,但是一编译就提示内存不足......

1202
来自专栏有趣的django

37.Django1.11.6文档

第一步 入门 检查版本 python -m django --version 创建第一个项目 django-admin startproject mysite ...

4948
来自专栏Spark学习技巧

Hbase源码系列之scan源码解析及调优

一,hbase的scan基本使用问题介绍 Hbase的Scan方法是基于Rowkey进行数据扫描的,过程中client会将我们的请求,转化为向服务端的RPC请求...

4358
来自专栏逆向技术

x64内核HOOK技术之拦截进程.拦截线程.拦截模块

            x64内核HOOK技术之拦截进程.拦截线程.拦截模块 一丶为什么讲解HOOK技术. 在32系统下, 例如我们要HOOK SSDT表,那么...

5037
来自专栏Spark学习技巧

Kafka源码系列之分组消费的再平衡策略

一,Kafka消费模式 从kafka消费消息,kafka客户端提供两种模式: 分区消费,分组消费。 分区消费对应的就是我们的DirectKafkaInputDS...

1K6

扫码关注云+社区

领取腾讯云代金券