PySpark分析二进制文件

客户需求

客户希望通过spark来分析二进制文件中0和1的数量以及占比。如果要分析的是目录,则针对目录下的每个文件单独进行分析。分析后的结果保存与被分析文件同名的日志文件中,内容包括0和1字符的数量与占比。

要求:如果值换算为二进制不足八位,则需要在左侧填充0。

可以在linux下查看二进制文件的内容。命令:

xxd –b –c 1 filename

命令参数-c 1是显示1列1个字符,-b是显示二进制。

遇到的坑

开发环境的问题

要在spark下使用python,需要事先使用pip安装pyspark。结果安装总是失败。python的第三方库地址是https://pypi.python.org/simple/,在国内访问很慢。通过搜索问题,许多文章提到了国内的镜像库,例如豆瓣的库,结果安装时都提示找不到pyspark。

查看安装错误原因,并非不能访问该库,仅仅是访问较慢,下载了不到8%的时候就提示下载失败。这实际上是连接超时的原因。因而可以修改连接超时值。可以在~/.pip/pip.conf下增加:

[global]timeout = 6000

虽然安装依然缓慢,但至少能保证pyspark安装完毕。但是在安装py4j时,又提示如下错误信息(安装环境为mac):

OSError: [Errno 1] Operation not permitted: '/System/Library/Frameworks/Python.framework/Versions/2.7/share'

即使这个安装方式是采用sudo,且在管理员身份下安装,仍然提示该错误。解决办法是执行如下安装:

pip install --upgrade pip sudo pip install numpy --upgrade --ignore-installed sudo pip install scipy --upgrade --ignore-installed sudo pip install scikit-learn --upgrade --ignore-installed

然后再重新执行sudo pip install pyspark,安装正确。

字符编码的坑

在提示信息以及最后分析的结果中都包含了中文。运行代码时,会提示如下错误信息:

SyntaxError: Non-ASCII character '\xe5' in file /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py on line 36, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details

需要在代码文件的首行添加如下编码声明: # This Python file uses the following encoding: utf-8

SparkConf的坑

初始化SparkContext的代码如下所示:

conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf)

结果报告运行错误:

Error initializing SparkContext.

org.apache.spark.SparkException: Could not parse Master URL: <pyspark.conf.SparkConf object at 0x106666390>

根据错误提示,以为是Master的设置有问题,实际上是实例化SparkContext有问题。阅读代码,发现它的构造函数声明如下所示:

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
            environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
            gateway=None, jsc=None, profiler_cls=BasicProfiler):

而前面的代码仅仅是简单的将conf传递给SparkContext构造函数,这就会导致Spark会将conf看做是master参数的值,即默认为第一个参数。所以这里要带名参数:

sc = SparkContext(conf = conf)

sys.argv的坑

我需要在使用spark-submit命令执行python脚本文件时,传入我需要分析的文件路径。与scala和java不同。scala的main函数参数argv实际上可以接受命令行传来的参数。python不能这样,只能使用sys模块来接收命令行参数,即sys.argv。

argv是一个list类型,当我们通过sys.argv获取传递进来的参数值时,一定要明白它会默认将spark-submit后要执行的python脚本文件路径作为第一个参数,而之后的参数则放在第二个。例如命令如下:

./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"

则:

  • argv[0]: /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py
  • argv[1]: files

因此,我需要获得files文件夹名,就应该通过argv[1]来获得。

此外,由于argv是一个list,没有size属性,而应该通过len()方法来获得它的长度,且期待的长度为2。

整数参与除法的坑

在python 2.7中,如果直接对整数执行除法,结果为去掉小数。因此4 / 5得到的结果却是0。在python 3中,这种运算会自动转型为浮点型。

要解决这个问题,最简单的办法是导入一个现成的模块:

from __future__ import division

注意:这个import的声明应该放在所有import声明前面。

附整个代码:

# This Python file uses the following encoding: utf-8

from __future__ import division
import os
import time
import sys
from pyspark import SparkConf, SparkContext

APP_NAME = "Load Bin Files"


def main(spark_context, path):
    file_paths = fetch_files(path)
    for file_path in file_paths:
        outputs = analysis_file_content(spark_context, path + "/" + file_path)
        print_outputs(outputs)
        save_outputs(file_path, outputs)


def fetch_files(path):
    if os.path.isfile(path):
        return [path]
    return os.listdir(path)


def analysis_file_content(spark_context, file_path):
    data = spark_context.binaryRecords(file_path, 1)
    records = data.flatMap(lambda d: list(bin(ord(d)).replace('0b', '').zfill(8)))
    mapped_with_key = records.map(lambda d: ('0', 1) if d == '0' else ('1', 1))
    result = mapped_with_key.reduceByKey(lambda x, y: x + y)

    total = result.map(lambda r: r[1]).sum()
    return result.map(lambda r: format_outputs(r, total)).collect()


def format_outputs(value_with_key, total):
    tu = (value_with_key[0], value_with_key[1], value_with_key[1] / total * 100)
    return "字符{0}的数量为{1}, 占比为{2:.2f}%".format(*tu)


def print_outputs(outputs):
    for output in outputs:
        print output


def save_outputs(file_path, outputs):
    result_dir = "result"
    if not os.path.exists(result_dir):
        os.mkdir(result_dir)

    output_file_name = "result/" + file_name_with_extension(file_path) + ".output"
    with open(output_file_name, "a") as result_file:
        for output in outputs:
            result_file.write(output + "\n")
        result_file.write("统计于{0}\n\n".format(format_logging_time()))


def format_logging_time():
    return time.strftime('%Y-%m-%d %H:%m:%s', time.localtime(time.time()))


def file_name_with_extension(path):
    last_index = path.rfind("/") + 1
    length = len(path)
    return path[last_index:length]


if __name__ == "__main__":
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc = SparkContext(conf=conf)

    if len(sys.argv) != 2:
        print("请输入正确的文件或目录路径")
    else:
        main(sc, sys.argv[1])

实现并不复杂,只是自己对Python不太熟悉,也从未用过PySpark,所以蹚了不少坑,所幸都不复杂,通过google都找到了解决方案。是为记!

原文发布于微信公众号 - 逸言(YiYan_OneWord)

原文发表时间:2018-01-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

Spark shuffle详细过程

有许多场景下,我们需要进行跨服务器的数据整合,比如两个表之间,通过Id进行join操作,你必须确保所有具有相同id的数据整合到相同的块文件中。那么我们先说一下m...

2142
来自专栏别先生

JSch - Java实现的SFTP(文件上传详解篇)

  JSch是Java Secure Channel的缩写。JSch是一个SSH2的纯Java实现。它允许你连接到一个SSH服务器,并且可以使用端口转发,X11...

2811
来自专栏Java编程技术

高并发编程必备基础(上)

借用Java并发编程实践中的话"编写正确的程序并不容易,而编写正常的并发程序就更难了",相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没...

912
来自专栏技术点滴

高性能分布式执行框架——Ray一、简单开始二、系统架构三、核心操作四、安装Ray参考资料

Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,它使用了和传统分布式计算系统不一样的架构和对分布式计算的抽象方式,具有比Spark更...

1932
来自专栏码云1024

c++DLL编程详解

3986
来自专栏Albert陈凯

Hadoop数据分析平台实战——060深入理解MapReduce 01(案例)离线数据分析平台实战——060深入理解MapReduce 01(案例)

离线数据分析平台实战——060深入理解MapReduce 01(案例) 用户自定义数据类型 MapReduce中的数据类型至少有两种用途。 第一个用途,这些类...

2919
来自专栏奔跑的蛙牛技术博客

java 读写二进制数据与java序列化

zip文档以压缩格式存储一个和多个文件,每个ZIP文件都有一个头,包含每个文件的名字和压缩方法等信息

952
来自专栏个人分享

Spark Shuffle数据处理过程与部分调优(源码阅读七)

  shuffle。。。相当重要,为什么咩,因为shuffle的性能优劣直接决定了整个计算引擎的性能和吞吐量。相比于Hadoop的MapReduce,可以看到S...

771
来自专栏养码场

Java后端开发面大集锦1.0,汇集了各大公司的面试点!你都能答上来吗?

各位在面试时,必然会遇到各位的技术问题。这次针对Java后端开发,以下这篇文章罗列了各大公司技术面试官可能会提及的问题,并做出了解答。若觉得不错,希望分享给更多...

573
来自专栏大内老A

ASP.NET Web API自身对CORS的支持: EnableCorsAttribute特性背后的故事

从编程的角度来讲,ASP.NET Web API针对CORS的实现仅仅涉及到HttpConfiguration的扩展方法EnableCors和EnableCor...

20510

扫码关注云+社区