所以我在airflow中有一个非常好的DAG,它基本上在二进制文件上运行几个分析步骤(作为airflow插件实现)。通过ftp传感器触发DAG,该传感器仅检查ftp服务器上是否有新文件,然后启动整个工作流程。
所以目前的工作流程是这样的: DAG是按照定义触发的,->传感器等待ftp上的新文件,->分析步骤是在工作流程结束时执行的。
我想要的是这样的东西: DAG是触发器,->传感器等待ftp上的新文件,->为ftp上的每个文件,分析步骤是单独执行的,->每个工作流程分别结束。
如何为ftp服务器上的每个文件执行分析工作流?如果服务器上没有文件,则只有一个传感器应等待新文件?例如,我不想每秒启动一次DAG,因为这样我就有许多传感器在等待新文件。
发布于 2019-03-05 06:48:52
使用2个DAG将检测步骤与分析步骤分开。
DAG 1:
传感器等待ftp ->上的新文件新文件到达后,使用TriggerDagRunOperator触发DAG1本身->使用TriggerDagRunOperator触发DAG2
DAG 2:
执行文件的分析步骤
https://stackoverflow.com/questions/54992541
复制相似问题