版权声明:本文为博主原创文章,未经博主允许不得转载。 https://cloud.tencent.com/developer/article/1411510
api list:https://hdfscli.readthedocs.io/en/latest/api.html#api-reference
获取hdfs data 文件夹下面所有csv 文件
from hdfs.client import Client
client = Client("http://IP:50070") # 50070: Hadoop默认namenode
#返回目录下的文件
def list_file(client,hdfs_path):
return client.list(hdfs_path, status=False)
#从hdfs获取文件到本地
def get_from_hdfs(client,hdfs_path,local_path):
client.download(hdfs_path, local_path, overwrite=False,n_threads=1)
hdfs_path = r'/user/hadoop/data'
name_list = list_file(client,hdfs_path)
#过滤所有csv文件
name_list_csv = [n for n in name_list if '.csv' in n]
print(name_list)
index = 1
for file in name_list_csv:
get_from_hdfs(client,hdfs_path+'/'+file,os.path.join( os.getcwd()))
os.rename(os.path.join(os.getcwd(),file),os.path.join(os.getcwd(),str(index)+".csv"))
index = index+1
python2 与hdfs交互的一些老方法可以参考这个博文
https://www.cnblogs.com/liyongsan/p/4987819.html
写也可以先saveAsTextFile,然后使用hdfs命令存到本地, 使用hdfs fs -get命令:
${HADOOP_COMMON_HOME}/bin/hadoop fs -get /hdfspath/to/data.txt /localpath/to/data.txt
或者,使用hdfs fs -copyToLocal命令:
${HADOOP_COMMON_HOME}/bin/hadoop fs -copyToLocal /hdfspath/to/data.txt /localpath/to/data.txt
使用popen 可以获取命令执行的返回值
os.popen(r'hadoop dfs -ls /user/').read()
https://docs.python.org/2/library/subprocess.html
该子模块允许你创建新的流程,连接到它们的输入/输出/错误管道,并获取他们的返回值。该模块打算替换多个旧的模块和功能:os.system 和 os.spawn *
使用subprocess时建议使用run()函数去处理所有它可以处理的情况,因为高级用法可以直接使用底层POPEN接口。
run()函数是Python 3.5中新添加的。
cat = subprocess.Popen(["hadoop", "fs", "-ls", "/user/hadoop/my_data"], stdout=subprocess.PIPE)
for line in cat.stdout:
print (line)
输出:
b’Found 2 items\n’ b’-rw-r–r-- 2 hadoop hadoop 0 2019-03-28 08:38 /user/hadoop/my_data/_SUCCESS\n’ b’-rw-r–r-- 2 hadoop hadoop 6967144 2019-03-28 08:38 /user/hadoop/my_data/part-00000-9431d082-957d-4a0b-a3ae-4ffa4674c70e-c000.csv\n’
http://aducode.github.io/posts/2016-08-02/write2hdfsinpyspark.html
python中调用java对象来操作hdfs文件
def path(sc, filepath):
"""
创建hadoop path对象
:param sc sparkContext对象
:param filename 文件绝对路径
:return org.apache.hadoop.fs.Path对象
"""
path_class = sc._gateway.jvm.org.apache.hadoop.fs.path
return path_class(filepath)
def get_file_system(sc):
"""
创建FileSystem
:param sc SparkContext
:return FileSystem对象
"""
filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FilFileSystem
hadoop_configuration = sc._jsc.hadoopConfiguration()
return filesystem_class.get(hadoop_configuration)
def write(sc, filepath, content, overwite=True):
"""
写内容到hdfs文件
:param sc SparkContext
:param filepath 绝对路径
:param content 文件内容
:param overwrite 是否覆盖
"""
filesystem = get_file_system(sc)
out = filesystem.create(path(sc, filepath), overwrite)
out.write(bytearray(content, "utf-8"))
out.flush()
out.close()
write(sc, '/user/hadoop/my_data/ll.txt', 'shenmemgui', overwite=True)
直接写文件到磁盘(这个可以搭建一个本地的spark 单机版试试)
对象引入的新方法
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None)
##e.g.
import os
import tempfile
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
#或者
df.repartition(1).write.csv(path="file:///"+(os.path.join( os.getcwd(),'test')), mode="overwrite", header="true")
用的时候这么用,我还以为os 都出来这个坨坨移到driver 的本地文件上了,结果还是在hdfs 的文件系统中。
这个函数说明中有一句
path – the path in any Hadoop supported file system
我想如果可行的话还是先写到hdfs 再挪回本地吧
mode="overwrite"慎用,我就直接把当前目录里面notebook 一些代码给覆盖了,结果找到找不回来,痛心。或者可以将dataframe 转化成rdd 后用saveAsTextFile 写回本地磁盘。
综上所述,我认为还是先写到hdfs 上或者s3上面比较安全,然后通过命令合并好文件再保存到本地。
https://www.cnblogs.com/hongdada/p/9475406.html
docker search spark
docker pull sequenceiq/spark
# 结果发现上面版本中的spark 是1.X 的
docker search spark2.0
#随便下一个
#机器上的其他容器先关了
docker stop $(docker ps -aq)
docker run -dit -p 8088:8088 -p 8042:8042 -p 4040:4040 -h sandbox sequenceiq/spark
docker image ls