首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >事件循环在芹菜工人中关闭。

事件循环在芹菜工人中关闭。
EN

Stack Overflow用户
提问于 2022-10-13 16:14:46
回答 1查看 293关注 0票数 3

我正面临多个问题,使用异步ODM在我的芹菜工人,首先,我无法插入我的数据库模型使用芹菜工人信号,我使用beanie的db连接。

第一实现

代码语言:javascript
复制
from asyncer import syncify
from asgiref.sync import async_to_sync 
client = AsyncIOMotorClient(
    DATABASE_URL, uuidRepresentation="standard" )
    db = client[DB_NAME]

async def db_session():
        await init_beanie(
        database=db,
        document_models=[Project, User],
    )
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
            logger.info('Startup celery worker process')
            async_to_sync(db_session)()
            logger.info('FINISHED : Startup celery worker process')
async def get_users():
    users = User.find()
    users_list = await users.to_list()
    return users_list

@celery_app.task
def pool_db():
    async_to_sync(get_users)()
    #syncify(get_users)() same error User class is not initialized yet (init_beanie should have already initialized all the models )

使用此实现,我无法使用User和Project类访问我的数据库,它会引发一个错误,好像用户和项目还没有实例化一样

解决方法是在模块级别调用db_session(),这解决了数据库模型实例化的问题,但是现在在查询数据库时,我从芹菜任务中得到了以下错误

RuntimeError:事件循环关闭

第二实现

代码语言:javascript
复制
from asyncer import syncify
from asgiref.sync import async_to_sync client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]

async def db_session():
        await init_beanie(
        database=db,
        document_models=[Project, User],
    )
# now  init_beanie at module level
async_to_sync(db_session)()

async def get_users():
    users = User.find()
    users_list = await users.to_list()
    return users_list

@celery_app.task
def pool_db():
    # this raises the following Runtime error RuntimeError('Event loop is closed')
    async_to_sync(get_users)()
    #syncify(get_users)() same error 

我不太熟悉异步如何实现,异步程序和asgiref如何允许在同步线程中运行异步代码,这使我感到困惑,任何帮助都是可以执行的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-21 14:16:21

在使用花来监视工人和记录工作人员Id (进程Id)之后,发现芹菜工作者本身不处理任何任务,它会产生其他子进程(这是我的情况,因为我使用的是预叉的默认执行器池),而信号( worker_ready.connect )只在监控程序进程Celery工作者上运行,并且由于进程是隔离内存的,这意味着您无法从子进程访问db连接或任何初始化源。现在我用芹菜和gevent一起使用,这只会产生1进程,因为最初我的项目不需要CPU繁重的任务--这意味着我不需要预叉池提供的所有cpu能力。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74058894

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档