我正在尝试实现一个MLOps应用程序,在码头容器中建立一个芹菜工人,并试图从本地环境发送任务消息。有三个集装箱:
我已经测试了推理任务,它可以通过以下方式远程调用:
dataset = pd.read_json('data.json')
data = dataset.text.values.tolist()
j_string = json.dumps(data, ensure_ascii=False)
predict_task = predict.apply_async(
args=(
'audience_bert',
1,
100,
j_string
)
)
print(predict_task)task_id:758d7455-af2d-494e-8ba9-f9e502a8727c

然而,当我试图检查状态并得到结果时:
result = AsyncResult(task_id)
print(result.state)
print(result.get())它显示了错误DisabledBackend,但我已经在容器和本地芹菜应用程序中设置了两个后端:
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 478, in state
return self._get_task_meta()['status']
File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 417, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\backends\base.py", line 609, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'我已经寻找可能的原因,导致DisabledBackend,而许多类似的帖子是关于缺少backend。
拜托,有人能帮我吗?
下面是我的文件和芹菜代码:
version: "3"
services:
celery_server:
env_file: .env
build:
context: .
dockerfile: Dockerfile
volumes:
- models:/model/torch_script
environment:
LEVEL: ${LEVEL}
links:
- redis
depends_on:
- redis
redis:
image: redis:latest
hostname: redis
ports:
- 6379:6379
triton_server:
image: nvcr.io/nvidia/tritonserver:22.06-py3
hostname: triton
ports:
- 8000:8000
- 8001:8001
- 8002:8002
command: ["tritonserver", "--model-store=/models", "--model-control-mode=poll", --repository-poll-secs=30]
volumes:
- models:/models
shm_size: 1g
ulimits:
memlock: -1
stack: 67108864
volumes:
models:import json
from typing import Dict
from celery import Celery
from config.settings import MODEL_CKPT, LogDir
from utils.inference_helper import chunks
from utils.log_helper import create_logger
from worker.inference.bert_triton_inference import BertInferenceWorker
from worker.train.chinese_bert_classification import ChineseBertClassification
app = Celery(
name='bert_celery',
broker="redis://redis:6379/0",
backend="redis://redis:6379/1"
)
app.conf.task_routes = {
'app.*': {'queue': 'deep_model'},
}
app.conf.update(result_expires=1)
app.conf.update(task_track_started=True)
@app.task(bind=True, queue='deep_model', name='training')
def training(
self,
model_name,
version,
dataset,
label_col,
learning_rate=2e-5,
epochs=50,
batch_size=32,
max_len=30,
is_multi_label=1,
ckpt=MODEL_CKPT.get('chinese-bert-wwm')
):
dataset = json.loads(dataset)
label_col = json.loads(label_col)
task_worker = ChineseBertClassification(
max_len=max_len,
ckpt=ckpt,
epochs=epochs,
learning_rate=learning_rate,
batch_size=batch_size,
dataset=dataset,
label_col=label_col,
model_name=model_name,
model_version=version,
is_multi_label=is_multi_label
)
task_worker.init_model()
results: Dict[str, str] = task_worker.run()
return results
@app.task(bind=True, queue='deep_model', name='predict')
def predict(self, model_name, version, max_len, dataset):
logger = create_logger(LogDir.inference)
data = json.loads(dataset)
output = []
for idx, chunk in enumerate(chunks(data, 32)):
logger.info(f" ==== batch: {idx} ==== ")
infer_worker = BertInferenceWorker(
dataset=chunk,
model_name=model_name,
model_version=version,
url='triton:8000',
backend='pytorch',
max_len=max_len,
chunk_size=len(chunk)
)
results = infer_worker.run()
# print(results)
output.extend(results.tolist())
assert len(output) == len(data)
return json.dumps(output, ensure_ascii=False)香芹(
)
from celery import Celery
app = Celery(
name='bert_celery',
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
app.conf.task_routes = {
'app.*': {'queue': 'deep_model'},
}
@app.task(bind=True, queue='deep_model', name='training')
def training(
self,
model_name,
version,
dataset,
label_col,
learning_rate,
epochs,
batch_size,
max_len,
is_multi_label
):
pass
@app.task(bind=True, queue='deep_model', name='predict')
def predict(
self,
model_name,
version,
max_len,
dataset
):
pass发布于 2022-07-21 03:16:24
我想出了这个问题的解决办法。
首先,我认为这可能是来自Redis的错误,所以我检查了码头日志,它显示:
WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.所以我重新配置了overcommit_memory,但问题仍然没有解决。
然后,我尝试添加相关的任务,我想检索任务信息,以及from app import predict的结果,在同一个脚本与AsyncResult和IT工作!
from celery.result import AsyncResult
from app import predict # this is essential
results = AsyncResult(<task_id>)
print(results.state) # if no task imported this may return `DisabledBackend`
print(results.get()) # if no task imported this may return `DisabledBackend`当芹菜试图获取结果信息时,您也必须导入与相关的任务,才能让它知道后端设置。这是个愚蠢的问题.
https://stackoverflow.com/questions/73045652
复制相似问题