我刚刚将airflow.cfg
配置为与LocalExecutor
一起工作
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
sql_engine_encoding = utf-8
初始化DB后,运行以下DAG:
with DAG(dag_id='a_example_parallel_v', schedule_interval=None, start_date=days_ago(2),) as dag:
def task1_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 1")
def task2_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 2")
def task3_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 3")
def task4_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 4")
task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task3 = PythonOperator(task_id='task3', python_callable=task3_func, provide_context=True)
task4 = PythonOperator(task_id='task4', python_callable=task4_func, provide_context=True)
task1 >> task2
task1 >> task3
task2 >> task4
task3 >> task4
如何配置气流,使任务2和任务3并行运行?
发布于 2022-02-15 18:37:17
气流在airflow.cfg
中有设定
execute_tasks_new_python_interpreter
-应该通过父进程的分叉("False",速度更快的选项)或者通过生成一个新的process来执行任务("True“很慢,但意味着任务会立即进行插件更改)。
在添加了这个特性的按下中可以找到这方面的理由:生成一个全新的python过程,然后重新加载所有的气流是很昂贵的。尽管这一次对于长期运行的任务没有意义,但是当新用户第一次尝试气流时,这种延迟给他们带来了“糟糕”的体验。
https://stackoverflow.com/questions/71131412
复制相似问题