前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据ETL实践探索(6)---- 使用python将大数据对象写回本地磁盘的几种方案

大数据ETL实践探索(6)---- 使用python将大数据对象写回本地磁盘的几种方案

作者头像
流川疯
发布2019-04-09 17:35:20
1.4K0
发布2019-04-09 17:35:20
举报

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://cloud.tencent.com/developer/article/1411510

文章大纲


1. python 与hdfs 交互 回写

1.1 使用hdfs 包

api list:https://hdfscli.readthedocs.io/en/latest/api.html#api-reference

获取hdfs data 文件夹下面所有csv 文件

代码语言:javascript
复制
from hdfs.client import Client
client = Client("http://IP:50070")  # 50070: Hadoop默认namenode
#返回目录下的文件
def list_file(client,hdfs_path):
    return client.list(hdfs_path, status=False)
#从hdfs获取文件到本地
def get_from_hdfs(client,hdfs_path,local_path):
    client.download(hdfs_path, local_path, overwrite=False,n_threads=1)
    
hdfs_path = r'/user/hadoop/data'
    
name_list = list_file(client,hdfs_path)
#过滤所有csv文件
name_list_csv  = [n for n in name_list if '.csv' in n]

print(name_list)
index = 1
for file in name_list_csv:
    get_from_hdfs(client,hdfs_path+'/'+file,os.path.join( os.getcwd()))
    os.rename(os.path.join(os.getcwd(),file),os.path.join(os.getcwd(),str(index)+".csv"))
    index = index+1

1.2 python2 与hdfs

python2 与hdfs交互的一些老方法可以参考这个博文

https://www.cnblogs.com/liyongsan/p/4987819.html

1.3 在python中直接调用hadoop shell 命令去操作文件

1.3.1 hadoop shell

写也可以先saveAsTextFile,然后使用hdfs命令存到本地, 使用hdfs fs -get命令:

代码语言:javascript
复制
${HADOOP_COMMON_HOME}/bin/hadoop fs -get /hdfspath/to/data.txt  /localpath/to/data.txt
或者,使用hdfs fs -copyToLocal命令:

${HADOOP_COMMON_HOME}/bin/hadoop fs -copyToLocal /hdfspath/to/data.txt  /localpath/to/data.txt

1.3.2 popen

使用popen 可以获取命令执行的返回值

代码语言:javascript
复制
os.popen(r'hadoop dfs -ls /user/').read()

1.3.3 subprocess

https://docs.python.org/2/library/subprocess.html

该子模块允许你创建新的流程,连接到它们的输入/输出/错误管道,并获取他们的返回值。该模块打算替换多个旧的模块和功能:os.system 和 os.spawn *

使用subprocess时建议使用run()函数去处理所有它可以处理的情况,因为高级用法可以直接使用底层POPEN接口。

run()函数是Python 3.5中新添加的。

代码语言:javascript
复制
cat = subprocess.Popen(["hadoop", "fs", "-ls", "/user/hadoop/my_data"], stdout=subprocess.PIPE)
for line in cat.stdout:
    print (line)

输出:

b’Found 2 items\n’ b’-rw-r–r-- 2 hadoop hadoop 0 2019-03-28 08:38 /user/hadoop/my_data/_SUCCESS\n’ b’-rw-r–r-- 2 hadoop hadoop 6967144 2019-03-28 08:38 /user/hadoop/my_data/part-00000-9431d082-957d-4a0b-a3ae-4ffa4674c70e-c000.csv\n’

1.4 python 与 py4j 交互

http://aducode.github.io/posts/2016-08-02/write2hdfsinpyspark.html

python中调用java对象来操作hdfs文件

代码语言:javascript
复制
def path(sc, filepath):
  """
  创建hadoop path对象
  :param sc sparkContext对象
  :param filename 文件绝对路径
  :return org.apache.hadoop.fs.Path对象
  """
  path_class = sc._gateway.jvm.org.apache.hadoop.fs.path
  return path_class(filepath)

def get_file_system(sc):
  """
  创建FileSystem
  :param sc SparkContext
  :return FileSystem对象
  """
  filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FilFileSystem
  hadoop_configuration = sc._jsc.hadoopConfiguration()
  return filesystem_class.get(hadoop_configuration)

def write(sc, filepath, content, overwite=True):
    """
  写内容到hdfs文件
  :param sc SparkContext
  :param filepath 绝对路径
  :param content 文件内容
  :param overwrite 是否覆盖
    """
    filesystem = get_file_system(sc)
    out = filesystem.create(path(sc, filepath), overwrite)
    out.write(bytearray(content, "utf-8"))
    out.flush()
    out.close()

write(sc, '/user/hadoop/my_data/ll.txt', 'shenmemgui', overwite=True)

2. pyspark 与driver 磁盘交互

直接写文件到磁盘(这个可以搭建一个本地的spark 单机版试试)

2.0版本后http://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/readwriter.html#DataFrameWriter.csv

对象引入的新方法

代码语言:javascript
复制
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
            header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
            timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
            charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None)

##e.g.

import os
import tempfile
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
#或者
df.repartition(1).write.csv(path="file:///"+(os.path.join( os.getcwd(),'test')), mode="overwrite", header="true")

用的时候这么用,我还以为os 都出来这个坨坨移到driver 的本地文件上了,结果还是在hdfs 的文件系统中。

这个函数说明中有一句

path – the path in any Hadoop supported file system

我想如果可行的话还是先写到hdfs 再挪回本地吧

mode="overwrite"慎用,我就直接把当前目录里面notebook 一些代码给覆盖了,结果找到找不回来,痛心。或者可以将dataframe 转化成rdd 后用saveAsTextFile 写回本地磁盘。

综上所述,我认为还是先写到hdfs 上或者s3上面比较安全,然后通过命令合并好文件再保存到本地。


3. python docker 搭建spark standalone 版本

https://www.cnblogs.com/hongdada/p/9475406.html

代码语言:javascript
复制
docker search spark
docker pull sequenceiq/spark
# 结果发现上面版本中的spark 是1.X 的

docker search spark2.0

#随便下一个

#机器上的其他容器先关了
docker stop $(docker ps -aq)

docker run -dit -p 8088:8088 -p 8042:8042 -p 4040:4040 -h sandbox sequenceiq/spark



docker image ls
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年04月01日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章大纲
  • 1. python 与hdfs 交互 回写
    • 1.1 使用hdfs 包
      • 1.2 python2 与hdfs
        • 1.3 在python中直接调用hadoop shell 命令去操作文件
          • 1.3.1 hadoop shell
          • 1.3.2 popen
          • 1.3.3 subprocess
        • 1.4 python 与 py4j 交互
        • 2. pyspark 与driver 磁盘交互
        • 3. python docker 搭建spark standalone 版本
        相关产品与服务
        大数据
        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档