这里我想在GCP的composer环境(1.10.6)中使用SFTPToGCSOperator。我知道有一个限制,因为操作员只存在于最新版本的airflow中,而不存在于composer最新版本1.10.6中。
请参阅参考- https://airflow.readthedocs.io/en/latest/howto/operator/gcp/sftp_to_gcs.html
我找到了operator的替代方案,并创建了一个插件类,但我再次遇到了sftphook类的问题,现在我使用的是旧版本的sftphook类。
请参阅以下参考资料-
从airflow.contrib.hooks.sftp_hook导入SFTPHook https://airflow.apache.org/docs/stable/_modules/airflow/contrib/hooks/sftp_hook.html
我已经创建了一个插件类,稍后将其导入到我的DAG脚本中。只有当我们移动一个文件时,它才能正常工作,在这种情况下,我们需要传递带有扩展名的完整文件路径。
请参考下面的例子(它在这个场景下工作正常)
DIR = "/test/sftp_dag_test/source_dir"
OBJECT_SRC_1 = "file.csv"
source_path=os.path.join(DIR, OBJECT_SRC_1),
除了这个,如果我们使用通配符,我的意思是,如果我们想要移动目录中的所有文件,我会收到get_tree_map方法的错误。
请参阅下面的DAG代码
import os
from airflow import models
from airflow.models import Variable
from PluginSFTPToGCSOperator import SFTPToGCSOperator
#from airflow.contrib.operators.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
DIR_path = "/main_dir/sub_dir/"
BUCKET_SRC = "test-gcp-bucket"
with models.DAG(
"dag_sftp_to_gcs", default_args=default_args, schedule_interval=None
) as dag:
copy_sftp_to_gcs = SFTPToGCSOperator(
task_id="t_sftp_to_gcs",
sftp_conn_id="test_sftp_conn",
gcp_conn_id="google_cloud_default",
source_path=os.path.join(DIR_path, "*.gz"),
destination_bucket=BUCKET_SRC,
)
copy_sftp_to_gcs
这里我们在DAG脚本中使用通配符*,请参见下面的插件类。
import os
from tempfile import NamedTemporaryFile
from typing import Optional, Union
from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults
WILDCARD = "*"
class SFTPToGCSOperator(BaseOperator):
template_fields = ("source_path", "destination_path", "destination_bucket")
@apply_defaults
def __init__(
self,
source_path: str,
destination_bucket: str = "destination_bucket",
destination_path: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "sftp_conn_plugin",
delegate_to: Optional[str] = None,
mime_type: str = "application/octet-stream",
gzip: bool = False,
move_object: bool = False,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.source_path = source_path
self.destination_path = self._set_destination_path(destination_path)
print('destination_bucket : ',destination_bucket)
self.destination_bucket = destination_bucket
self.gcp_conn_id = gcp_conn_id
self.mime_type = mime_type
self.delegate_to = delegate_to
self.gzip = gzip
self.sftp_conn_id = sftp_conn_id
self.move_object = move_object
def execute(self, context):
print("inside execute")
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
sftp_hook = SFTPHook(self.sftp_conn_id)
if WILDCARD in self.source_path:
total_wildcards = self.source_path.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in source_path parameter. "
"Found {} in {}.".format(total_wildcards, self.source_path)
)
print('self.source_path : ',self.source_path)
prefix, delimiter = self.source_path.split(WILDCARD, 1)
print('prefix : ',prefix)
base_path = os.path.dirname(prefix)
print('base_path : ',base_path)
files, _, _ = sftp_hook.get_tree_map(
base_path, prefix=prefix, delimiter=delimiter
)
for file in files:
destination_path = file.replace(base_path, self.destination_path, 1)
self._copy_single_object(gcs_hook, sftp_hook, file, destination_path)
else:
destination_object = (
self.destination_path
if self.destination_path
else self.source_path.rsplit("/", 1)[1]
)
self._copy_single_object(
gcs_hook, sftp_hook, self.source_path, destination_object
)
def _copy_single_object(
self,
gcs_hook: GoogleCloudStorageHook,
sftp_hook: SFTPHook,
source_path: str,
destination_object: str,
) -> None:
"""
Helper function to copy single object.
"""
self.log.info(
"Executing copy of %s to gs://%s/%s",
source_path,
self.destination_bucket,
destination_object,
)
with NamedTemporaryFile("w") as tmp:
sftp_hook.retrieve_file(source_path, tmp.name)
print('before upload self det object : ',self.destination_bucket)
gcs_hook.upload(
self.destination_bucket,
destination_object,
tmp.name,
self.mime_type,
)
if self.move_object:
self.log.info("Executing delete of %s", source_path)
sftp_hook.delete_file(source_path)
@staticmethod
def _set_destination_path(path: Union[str, None]) -> str:
if path is not None:
return path.lstrip("/") if path.startswith("/") else path
return ""
@staticmethod
def _set_bucket_name(name: str) -> str:
bucket = name if not name.startswith("gs://") else name[5:]
return bucket.strip("/")
class SFTPToGCSOperatorPlugin(AirflowPlugin):
name = "SFTPToGCSOperatorPlugin"
operators = [SFTPToGCSOperator]
所以我在我的DAG脚本中导入了这个插件类,当我们使用文件名时,它工作得很好,因为代码放在else条件中。
但是当我们使用通配符时,我们在if条件中有游标,并且我在get_tree_map方法中得到错误。
请参见下面的错误-
ERROR - 'SFTPHook' object has no attribute 'get_tree_map'
我发现这个错误的原因这个方法本身并不存在于composer(airflow 1.10.6)- https://airflow.apache.org/docs/stable/_modules/airflow/contrib/hooks/sftp_hook.html中
此方法出现在最新版本的airflow https://airflow.readthedocs.io/en/latest/_modules/airflow/providers/sftp/hooks/sftp.html中
现在我应该尝试什么,有没有这个方法的替代方法,或者这个运算符类的任何替代方案。
有没有人知道这个问题是否有解决方案?
提前谢谢。
请忽略堆栈溢出中的Typo或缩进错误。在我的代码中没有缩进错误。
发布于 2021-01-23 00:41:05
"providers“包只在Airflow 2.0中可用,但在Cloud Composer中尚未提供(在我写这篇文章时,最新可用的Airflow图像是1.10.14,今天上午发布)。
但是你可以导入backport包,让你在更早的1.10..*版本中享受这些新的包。
我的requirements.txt:
apache-airflow-backport-providers-ssh==2020.10.29
apache-airflow-backport-providers-sftp==2020.10.29
pysftp>=0.2.9
paramiko>=2.6.0
sshtunnel<0.2,>=0.1.4
可以从控制台直接在Composer环境中导入PyPi包。
有了这些依赖项,我就可以使用最新的airflow.providers.ssh.operators.ssh.SSHOperator
(以前是airflow.contrib.operators.ssh_operator.SSHOperator
)和新的airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSToSFTPOperator
(在contrib
运算符中没有等价物)。
享受吧!
发布于 2020-05-19 22:45:40
要在Airflow版本1.10.6上使用Google Cloud Composer中的SFTPToGCSOperator
,我们需要创建一个插件,并以某种方式通过将操作符/钩子代码复制到一个文件中来“黑”Airflow,以启用Airflow 1.10.10版本中的SFTPToGCSOperator
代码。
最新的Airflow版本有一个新的airflow.providers
目录,该目录在早期版本中不存在。这就是你看到以下错误的原因:No module named airflow.providers
。下面描述了我所做的所有更改:
我准备了一个工作插件,你可以下载here。在使用它之前,我们必须在Cloud Composer环境中安装以下PyPI库:pysftp
、paramiko
、sshtunnel
。
SFTPToGCSOperator
code,从第792行开始。您可以看到,此操作符使用GCSHookfrom airflow.providers.google.cloud.hooks.gcs import GCSHook
它还需要复制到插件中-从第193行开始。
GCSHook
继承自GoogleBaseHook
类,我们可以将其更改为在Airflow 1.10.6版本中可访问的GoogleCloudBaseHook
,并导入它:from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
SFTPHook
code导入到插件中-从继承自SSHHook
类的第39行开始,我们可以通过更改导入语句来使用Airflow 1.10.6版本中的一个:在文件的末尾,你可以找到from airflow.contrib.hooks.ssh_hook import SSHHook
class SFTPToGCSOperatorPlugin(AirflowPlugin):
name = "SFTPToGCSOperatorPlugin"
operators = [SFTPToGCSOperator]
需要创建插件,因为Airflow 1.10.6版本( Cloud Composer的最新版本)中当前不提供Airflow内置操作符。您可以关注Cloud Composer version lists,以便了解何时可以使用最新版本的Airflow。
我希望上面的信息对你有用。
https://stackoverflow.com/questions/61774987
复制相似问题