首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >设置后端时,芹菜分发任务无法接收结果。

设置后端时,芹菜分发任务无法接收结果。
EN

Stack Overflow用户
提问于 2022-07-20 03:26:22
回答 1查看 33关注 0票数 0

我正在尝试实现一个MLOps应用程序,在码头容器中建立一个芹菜工人,并试图从本地环境发送任务消息。有三个集装箱:

  • multi_label_text_classification_celery_server_1
    • 用于培训和inference

  • multi_label_text_classification_redis_1
  • multi_label_text_classification_triton_server_1

我已经测试了推理任务,它可以通过以下方式远程调用:

代码语言:javascript
运行
复制
    dataset = pd.read_json('data.json')
    data = dataset.text.values.tolist()
    j_string = json.dumps(data, ensure_ascii=False)

    predict_task = predict.apply_async(
        args=(
            'audience_bert',
            1,
            100,
            j_string
        )
    )
    print(predict_task)

task_id758d7455-af2d-494e-8ba9-f9e502a8727c

然而,当我试图检查状态并得到结果时:

代码语言:javascript
运行
复制
result = AsyncResult(task_id)
print(result.state)
print(result.get())

它显示了错误DisabledBackend,但我已经在容器和本地芹菜应用程序中设置了两个后端:

代码语言:javascript
运行
复制
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 478, in state
    return self._get_task_meta()['status']
  File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 417, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\backends\base.py", line 609, in get_task_meta
    meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

我已经寻找可能的原因,导致DisabledBackend,而许多类似的帖子是关于缺少backend

拜托,有人能帮我吗?

下面是我的文件和芹菜代码:

  • docker-compose.yml

代码语言:javascript
运行
复制
version: "3"

services:
  celery_server:
    env_file: .env
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - models:/model/torch_script
    environment:
      LEVEL: ${LEVEL}
    links:
      - redis
    depends_on:
      - redis

  redis:
    image: redis:latest
    hostname: redis
    ports:
      - 6379:6379

  triton_server:
    image: nvcr.io/nvidia/tritonserver:22.06-py3
    hostname: triton
    ports:
      - 8000:8000
      - 8001:8001
      - 8002:8002
    command: ["tritonserver", "--model-store=/models", "--model-control-mode=poll", --repository-poll-secs=30]
    volumes:
      - models:/models
    shm_size: 1g
    ulimits:
      memlock: -1
      stack: 67108864

volumes:
  models:

  • 集装箱工人

代码语言:javascript
运行
复制
import json
from typing import Dict

from celery import Celery

from config.settings import MODEL_CKPT, LogDir
from utils.inference_helper import chunks
from utils.log_helper import create_logger
from worker.inference.bert_triton_inference import BertInferenceWorker
from worker.train.chinese_bert_classification import ChineseBertClassification

app = Celery(
    name='bert_celery',
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1"
)

app.conf.task_routes = {
    'app.*': {'queue': 'deep_model'},
}

app.conf.update(result_expires=1)
app.conf.update(task_track_started=True)


@app.task(bind=True, queue='deep_model', name='training')
def training(
        self,
        model_name,
        version,
        dataset,
        label_col,
        learning_rate=2e-5,
        epochs=50,
        batch_size=32,
        max_len=30,
        is_multi_label=1,
        ckpt=MODEL_CKPT.get('chinese-bert-wwm')

):
    dataset = json.loads(dataset)
    label_col = json.loads(label_col)

    task_worker = ChineseBertClassification(
        max_len=max_len,
        ckpt=ckpt,
        epochs=epochs,
        learning_rate=learning_rate,
        batch_size=batch_size,
        dataset=dataset,
        label_col=label_col,
        model_name=model_name,
        model_version=version,
        is_multi_label=is_multi_label
    )

    task_worker.init_model()
    results: Dict[str, str] = task_worker.run()
    return results


@app.task(bind=True, queue='deep_model', name='predict')
def predict(self, model_name, version, max_len, dataset):
    logger = create_logger(LogDir.inference)
    data = json.loads(dataset)

    output = []
    for idx, chunk in enumerate(chunks(data, 32)):
        logger.info(f" ==== batch: {idx} ==== ")
        infer_worker = BertInferenceWorker(
            dataset=chunk,
            model_name=model_name,
            model_version=version,
            url='triton:8000',
            backend='pytorch',
            max_len=max_len,
            chunk_size=len(chunk)
        )
        results = infer_worker.run()
        # print(results)
        output.extend(results.tolist())

    assert len(output) == len(data)

    return json.dumps(output, ensure_ascii=False)

香芹(

  • localhost

)

代码语言:javascript
运行
复制
from celery import Celery


app = Celery(
    name='bert_celery',
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)

app.conf.task_routes = {
    'app.*': {'queue': 'deep_model'},
}


@app.task(bind=True, queue='deep_model', name='training')
def training(
        self,
        model_name,
        version,
        dataset,
        label_col,
        learning_rate,
        epochs,
        batch_size,
        max_len,
        is_multi_label
):
    pass


@app.task(bind=True, queue='deep_model', name='predict')
def predict(
        self,
        model_name,
        version,
        max_len,
        dataset
):
    pass
EN

回答 1

Stack Overflow用户

发布于 2022-07-21 03:16:24

我想出了这个问题的解决办法。

首先,我认为这可能是来自Redis的错误,所以我检查了码头日志,它显示:

代码语言:javascript
运行
复制
WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.

所以我重新配置了overcommit_memory,但问题仍然没有解决。

然后,我尝试添加相关的任务,我想检索任务信息,以及from app import predict的结果,在同一个脚本与AsyncResult和IT工作!

代码语言:javascript
运行
复制
from celery.result import AsyncResult 
from app import predict # this is essential

results = AsyncResult(<task_id>)
print(results.state) # if no task imported this may return `DisabledBackend`
print(results.get()) # if no task imported this may return `DisabledBackend`

当芹菜试图获取结果信息时,您也必须导入与相关的任务,才能让它知道后端设置。这是个愚蠢的问题.

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73045652

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档