我正面临多个问题,使用异步ODM在我的芹菜工人,首先,我无法插入我的数据库模型使用芹菜工人信号,我使用beanie的db连接。
第一实现
from asyncer import syncify
from asgiref.sync import async_to_sync
client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]
async def db_session():
await init_beanie(
database=db,
document_models=[Project, User],
)
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
logger.info('Startup celery worker process')
async_to_sync(db_session)()
logger.info('FINISHED : Startup celery worker process')
async def get_users():
users = User.find()
users_list = await users.to_list()
return users_list
@celery_app.task
def pool_db():
async_to_sync(get_users)()
#syncify(get_users)() same error User class is not initialized yet (init_beanie should have already initialized all the models )使用此实现,我无法使用User和Project类访问我的数据库,它会引发一个错误,好像用户和项目还没有实例化一样
解决方法是在模块级别调用db_session(),这解决了数据库模型实例化的问题,但是现在在查询数据库时,我从芹菜任务中得到了以下错误
RuntimeError:事件循环关闭
第二实现
from asyncer import syncify
from asgiref.sync import async_to_sync client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]
async def db_session():
await init_beanie(
database=db,
document_models=[Project, User],
)
# now init_beanie at module level
async_to_sync(db_session)()
async def get_users():
users = User.find()
users_list = await users.to_list()
return users_list
@celery_app.task
def pool_db():
# this raises the following Runtime error RuntimeError('Event loop is closed')
async_to_sync(get_users)()
#syncify(get_users)() same error 我不太熟悉异步如何实现,异步程序和asgiref如何允许在同步线程中运行异步代码,这使我感到困惑,任何帮助都是可以执行的。
发布于 2022-10-21 14:16:21
在使用花来监视工人和记录工作人员Id (进程Id)之后,发现芹菜工作者本身不处理任何任务,它会产生其他子进程(这是我的情况,因为我使用的是预叉的默认执行器池),而信号( worker_ready.connect )只在监控程序进程Celery工作者上运行,并且由于进程是隔离内存的,这意味着您无法从子进程访问db连接或任何初始化源。现在我用芹菜和gevent一起使用,这只会产生1进程,因为最初我的项目不需要CPU繁重的任务--这意味着我不需要预叉池提供的所有cpu能力。
https://stackoverflow.com/questions/74058894
复制相似问题