我刚开始使用Airflow来协调我们的ETL管道。
我在运行dag时遇到了管道错误。
我见过一个通用的stackoverflow讨论here。
我的情况更多的是在气流方面。根据该帖子中的讨论,可能的根本原因是:
如果您的请求被阻塞或花费的时间太长,通常会发生管道断开错误,在请求端超时后,它将关闭连接,然后,当响应端(服务器)尝试写入套接字时,它将抛出管道断开错误。
这可能是我的情况下的真正原因,我有一个place操作员,它将在Airflow之外启动另一个作业,并且该作业可能非常长(即10+小时),我想知道Airflow中的机制是什么,我可以利用它来防止此错误。
有人能帮上忙吗?
UPDATE1 20190303-1:
多亏了@y2k-shubham for the SSHOperator,我能够使用它成功地建立一个hostname
连接,并且能够在远程站点上运行一些简单的命令(实际上,默认的SSH连接必须设置为localhost,因为作业在本地主机上),并且能够看到hostname
的正确结果pwd
。
但是,当我尝试运行实际的作业时,我收到了相同的错误,同样,错误来自jpipeline ob,而不是Airflow dag/task。
UPDATE2: 20190303-2
我有一个成功的运行(气流测试),没有错误,然后跟随另一个失败的运行(调度程序),在管道中出现相同的错误。
发布于 2019-03-04 02:43:25
虽然我建议您继续寻找一种更优雅的方式来尝试实现您想要的东西,但我根据要求提供了示例用法
首先,您必须创建一个SSHHook
。这可以通过两种方式来完成
钩子(钩子)-传统的方法,您可以从客户端代码中提供所有必需的设置,如主机、用户、密码(如果需要)等,您将在客户端代码中实例化钩子。我在此引用test_ssh_hook.py
的一个示例,但您必须彻底了解SSHHook
及其测试,才能理解所有可能的用法
ssh_hook = SSHHook(remote_host="remote_host",port=“端口”,username=“用户名”,timeout=10,port
Airflow
way,您可以在其中将所有连接详细信息放在可从UI管理的Connection
对象中,并且只传递它的conn_id
来实例化您的钩子ssh_hook = SSHHook(ssh_conn_id="my_ssh_conn_id")
当然,如果您依赖于SSHOperator
,那么您可以直接将ssh_conn_id
传递给operator。
ssh_operator = SSHOperator(ssh_conn_id="my_ssh_conn_id")
现在,如果您计划使用专用任务在SSH
上运行命令,则可以使用SSHOperator
。我再一次引用了一个来自test_ssh_operator.py
的例子,但为了更好地了解情况,请浏览一下源代码。
测试任务=SSHOperator(task_id=“
”,command="echo -n airflow",dag=self.dag,timeout=10,ssh_conn_id="ssh_default")
但是,作为更大任务的一部分,您可能希望在SSH上运行命令。在这种情况下,您不需要SSHOperator
,您仍然可以只使用SSHHook
。SSHHook
的get_conn()
方法为您提供了一个paramiko
SSHClient
实例。这样,您就可以使用exec_command()
调用来运行命令
my_command =“回声气流”stdin,stdout,stderr = ssh_client.exec_command( command=my_command,get_pty=my_command.startswith("sudo"),timeout=10)
如果您查看一下SSHOperator
的execute()
方法,就会发现它是一段相当复杂(但健壮)的代码,试图实现一件非常简单的事情。
有关在
SSHHook
的信息,请参见MultiCmdSSHOperator
,了解
SSHOperator
(您可以使用bash
's &&
operator实现相同的功能)https://stackoverflow.com/questions/54965230
复制相似问题