我目前正在尝试在Airflow中使用FTPHook,以便将文件上传到远程ftp或从远程ftp下载文件。但是我不确定是否可以使用gs://路径作为源/目标路径的一部分。我目前不想在AF pod中使用本地文件夹,因为文件大小可能会很大,所以我宁愿直接使用gcs路径或gcs文件流。
conn = FTPHook(ftp_conn_id='ftp_default')
conn.store_file('in', 'gs://bucket_name/file_name.txt')
指向FTPHook代码的链接:here
谢谢你的帮助!
发布于 2018-08-02 05:47:09
GCS没有实现FTP支持,因此这将不起作用。看起来FTP钩子只知道如何处理本地文件路径或缓冲区,而不是GCS之一。
您也许能够找到(或编写)一些从FTP读取并写入GCS的代码。
发布于 2019-02-21 08:43:10
我发现了一个简单的流媒体解决方案,可以使用pysftp从gcs上传/下载到ftp服务器,反之亦然,我想与您分享。首先,我找到了this解决方案,它工作得很好,但唯一的问题是它不支持从gcs到FTP的文件上传。所以我在找别的东西。所以我正在寻找不同的方法,所以我找到了这个谷歌document,它基本上允许你流入/流出blob文件,这正是我正在寻找的。
params = BaseHook.get_connection(self.ftp_conn_id)
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
ftp = pysftp.Connection(host=params.host, username=params.login, password=params.password,
port=params.port,
cnopts=cnopts)
#This will download file from FTP server to GCS location
with ftp.open(self.ftp_folder + '/' + file_to_load, 'r+') as remote_file:
blob = bucket.blob(self.gcs_prefix + file_to_load)
blob.upload_from_file(remote_file)
#This will upload file from GCS to FTP server
with sftp.open(self.ftp_folder + '/' +file_name,'w+') as remote_file:
blob = bucket.blob(fileObject['name'])
blob.download_to_file(remote_file)
https://stackoverflow.com/questions/51642327
复制相似问题