首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Joblib和Airflow中的其他并行任务

Joblib和Airflow中的其他并行任务
EN

Stack Overflow用户
提问于 2021-07-09 22:23:28
回答 2查看 481关注 0票数 9

我在过去使用过Joblib和Airflow,没有遇到过这个问题。我正在尝试通过Airflow运行一个作业,该作业使用Joblib进行并行计算。当Airflow作业启动时,我看到以下警告

代码语言:javascript
运行
复制
UserWarning: Loky-backed parallel loops cannot be called in multiprocessing, setting n_jobs=1

追根溯源,我在LokyBackend类( MultiprocessingBackend类中也有类似的逻辑)的joblib包中看到以下函数触发。

代码语言:javascript
运行
复制
def effective_n_jobs(self, n_jobs):
    """Determine the number of jobs which are going to run in parallel"""
    if n_jobs == 0:
        raise ValueError('n_jobs == 0 in Parallel has no meaning')
    elif mp is None or n_jobs is None:
        # multiprocessing is not available or disabled, fallback
        # to sequential mode
        return 1
    elif mp.current_process().daemon:
        # Daemonic processes cannot have children
        if n_jobs != 1:
            warnings.warn(
                'Loky-backed parallel loops cannot be called in a'
                ' multiprocessing, setting n_jobs=1',
                stacklevel=3)
        return 1

问题是,我之前在Joblib和Airflow中运行过类似的函数,并没有触发这个条件来将n_jobs设置为1。我想知道这是不是某种版本问题(使用Airflow 2.X和Joblib 1.X),或者Airflow中是否有可以修复这个问题的设置。我查看了Joblib的旧版本,甚至降级到了Joblib 0.4.0,但这并没有解决任何问题。由于API、数据库连接等方面的差异,我对是否降级Airflow更加犹豫。

编辑:

下面是我在Airflow中运行的代码:

代码语言:javascript
运行
复制
def test_parallel():
    out=joblib.Parallel(n_jobs=-1, backend="loky")(joblib.delayed(lambda a: a+1)(i) for i in range(20))

with DAG("test", default_args=DEFAULT_ARGS, schedule_interval="0 8 * * *",) as test:
    run_test = PythonOperator(
        task_id="test",
        python_callable=test_parallel,
    )

    run_test

以及气流日志中的输出:

代码语言:javascript
运行
复制
[2021-07-27 10:41:29,890] {logging_mixin.py:104} WARNING - /data01/code/virtualenv/alpha/lib/python3.8/site-packages/joblib/parallel.py:733 UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1

我通过supervisor启动airflow schedulerairflow webserver。但是,即使我从命令行启动这两个airflow进程,问题仍然存在。但是,当我仅通过airflow任务API (例如airflow tasks test run_test )运行任务时,就不会发生这种情况

EN

Stack Overflow用户

发布于 2021-09-09 13:42:30

我注意到您没有调用代码底部的run_test函数。这会是任何问题的原因吗?更正后的版本:

代码语言:javascript
运行
复制
def test_parallel():
    out=joblib.Parallel(n_jobs=-1, backend="loky")(joblib.delayed(lambda a: a+1)(i) for i in range(20))

with DAG("test", default_args=DEFAULT_ARGS, schedule_interval="0 8 * * *",) as test:
    run_test = PythonOperator(
        task_id="test",
        python_callable=test_parallel,
    )

    run_test()
票数 0
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68318379

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档