前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >开源AIGC学习—AIGC模型异步服务

开源AIGC学习—AIGC模型异步服务

原创
作者头像
平常心
修改2024-03-19 14:35:07
1100
修改2024-03-19 14:35:07
举报
文章被收录于专栏:个人总结系列个人总结系列

一、模型准备

详细内容见:开源AIGC学习—文生视频模型本地运行

开源AIGC学习—文生图模型服务封装

开源AIGC学习—文生图模型本地运行

二、异步服务封装

主要通过python 的fastapi方式,进行文生图、文生视频异步服务封装,详细代码见:

代码语言:python
复制
# -- utf-8 ---
import asyncio
import uvicorn
from asyncio import Queue
from random import randint
from typing import Annotated
from fastapi import FastAPI, Form
from contextlib import asynccontextmanager
from uuid import UUID, uuid1
from pydantic import BaseModel


import cv2
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks

import torch
from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
from diffusers.utils import export_to_video

task = Tasks.text_to_image_synthesis
model_id = '/mnt/d/aigc_model/modelscope/damo/multi-modal_chinese_stable_diffusion_v1'
image_pipe = pipeline(task=task, model=model_id)


viedo_pipe = DiffusionPipeline.from_pretrained("/mnt/d/aigc_model/hub/models--damo-vilab--text-to-video-ms-1.7b/snapshots/8227dddca75a8561bf858d604cc5dae52b954d01", torch_dtype=torch.float16, variant="fp16")
viedo_pipe.scheduler = DPMSolverMultistepScheduler.from_config(viedo_pipe.scheduler.config)
viedo_pipe.to("cuda")


async def commandText2Image(queue: Queue):
    while True:
        print("Start to get task from text_to_image_queue")
        text_info: TextInof = await queue.get()
        
        prompt = text_info.prompts
        print(f"task prompts is {text_info.prompts}, task id is {str(text_info.tracking_id)}.")
        
        output = image_pipe({'text': prompt})
        image_output=  "/mnt/d/aigc_result/" + str(text_info.tracking_id) + ".png"
        cv2.imwrite(image_output, output['output_imgs'][0])
        
        queue.task_done()
        print(f"Completed the task from the text_to_image_queue and image file output path is {image_output}.")


async def commandText2Viedo(queue: Queue):
    while True:
        print("Start to get task from text_to_video_queue")
        text_info: TextInof = await queue.get()
        
        prompt = text_info.prompts
        print(f"task prompts is {text_info.prompts}, task id is {str(text_info.tracking_id)}")
        
        video_frames = viedo_pipe(prompt, num_inference_steps=25).frames
        video_output=  "/mnt/d/aigc_result/" + str(text_info.tracking_id) + ".mp4"
        export_to_video(video_frames, video_output)
        
        queue.task_done()
        print(f"Completed the task from the text_to_video_queue and video file output path is {video_output}.")
  

@asynccontextmanager
async def lifespan(app: FastAPI):
    asyncio.create_task(commandText2Image(text_to_image_queue))
    asyncio.create_task(commandText2Viedo(text_to_video_queue))
    yield

app = FastAPI(lifespan=lifespan)
text_to_image_queue = Queue()
text_to_video_queue = Queue()

class TextInof(BaseModel):
    tracking_id: UUID
    prompts: str
    process_time: int = None
    
@app.post("/text2x")
async def post_task(prompts: Annotated[str, Form()], type_info:Annotated[str, Form()]):
    user_request = TextInof(
        tracking_id=uuid1(),
        prompts=prompts,
        process_time=randint(1,5)
    )
    
    if type_info == "image":
        await text_to_image_queue.put(user_request)
    elif type_info == "viedo":
        await text_to_video_queue.put(user_request)
   
    print(f"Received task, prompt is {user_request.prompts}, task id is {user_request.tracking_id}.")
    info = "Received your text to {} request.".format(type_info)
    return info

if __name__ == "__main__":
    uvicorn.run(app=app, port=2000)

可以使用celery进行异步封装调用:

代码语言:python
复制
文件目录celery/celery_app.py
              celery_text2x.py
              
#celery_app.py
# -- utf-8 ---
from celery import Celery


import cv2
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks

import torch
from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
from diffusers.utils import export_to_video

task = Tasks.text_to_image_synthesis
model_id = '/mnt/d/aigc_model/modelscope/damo/multi-modal_chinese_stable_diffusion_v1'
image_pipe = pipeline(task=task, model=model_id)

viedo_pipe = DiffusionPipeline.from_pretrained("/mnt/d/aigc_model/hub/models--damo-vilab--text-to-video-ms-1.7b/snapshots/8227dddca75a8561bf858d604cc5dae52b954d01", torch_dtype=torch.float16, variant="fp16")
viedo_pipe.scheduler = DPMSolverMultistepScheduler.from_config(viedo_pipe.scheduler.config)
viedo_pipe.to("cuda")

celery_app = Celery('text2xApp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

celery_app.conf.update(
    result_expires=60
)

# 定义一个 Celery 文生图任务
@celery_app.task
def commandText2Image(prompts, tracking_id):
    print(f"task prompts is {prompts}, task id is {tracking_id}.")
    
    output = image_pipe({'text': prompts})
    image_output=  "/mnt/d/aigc_result/" + tracking_id + ".png"
    cv2.imwrite(image_output, output['output_imgs'][0])
        
    print(f"Completed the task from the text_to_image and image file output path is {image_output}.")

# 定义一个 Celery 文生视频任务
@celery_app.task
def commandText2Viedo(prompts, tracking_id):
    print(f"task prompts is {text_info.prompts}, task id is {str(text_info.tracking_id)}")
        
    video_frames = viedo_pipe(prompt, num_inference_steps=25).frames
    video_output=  "/mnt/d/aigc_result/" + tracking_id + ".mp4"
    export_to_video(video_frames, video_output)
        
    print(f"Completed the task from the text_to_video and video file output path is {video_output}.")  


#celery_text2x.py
# -- utf-8 ---
import asyncio
import uvicorn
from typing import Annotated
from fastapi import FastAPI, Form
from uuid import UUID, uuid1
import celery_app

app = FastAPI()

@app.post("/text2x")
async def post_task(prompts: Annotated[str, Form()], type_info:Annotated[str, Form()]):
    tracking_id=str(uuid1())
    if type_info == "image":
        # 触发 Celery 任务
        task = commandText2Image.delay(prompts, tracking_id)
    elif type_info == "viedo":
        task = commandText2Viedo.delay(prompts, tracking_id)
   
    print(f"Received task, prompt is {prompts}, task id is {tracking_id}.")
    print(f"Received your text to {type_info} request.")
     # 异步等待任务结果
    result = await celery_app.AsyncResult(task.id).get()
    return {"result": result}

if __name__ == "__main__":
    uvicorn.run(app=app, port=2000)

执行启动命令:

代码语言:shell
复制
/mnt/d/code/celery# celery -A celery_app worker -l info

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、模型准备
  • 二、异步服务封装
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档