首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据库和ORMS:使用 Motor 跟 MongoDB 通信

数据库和ORMS:使用 Motor 跟 MongoDB 通信

作者头像
Michael阿明
发布2022-11-27 17:23:22
1K0
发布2022-11-27 17:23:22
举报

文章目录

learn from 《Building Data Science Applications with FastAPI》

面向文档的数据库(如MongoDB)不需要预先配置模式

Motor,这是一个用于与 MongoDB 异步通信的库,由MongoDB组织官方支持

1. 安装

pip install motor
Successfully installed motor-2.5.1 pymongo-3.12.3

2. 创建models

  • MongoDB 会为每个文件创建 _id 属性作为唯一标识符,但是 _ 开头的变量被 Pydantic 认为是私有的,不会作为数据字段
  • _id 是二进制对象,不被 Pydantic 支持
# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : models.py
# @desc :

from datetime import datetime
from typing import Optional
from bson import ObjectId  # A MongoDB ObjectId
from pydantic import BaseModel, Field


class PyObjectId(ObjectId):
    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if not ObjectId.is_valid(v):
            raise ValueError("Invalid objectid")
        return ObjectId(v)

    @classmethod
    def __modify_schema__(cls, field_schema):
        field_schema.update(type="string")


class MongoBaseModel(BaseModel):
    # PyObjectId 类的作用是 使得 ObjectId 成为 Pydantic 兼容的类型
    id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
    # alias 是一个 pydantic选项,在调用 dict 方法时,会转换为 _id 名,这是MongoDB需要的

    class Config:
        json_encoders = {ObjectId: str}
        # json序列化时,采用的映射方法,ObjectId自己实现了__str__,可以被映射为 str


class PostBase(MongoBaseModel):
    title: str
    content: str
    publication_date: datetime = Field(default_factory=datetime.now)


class PostPartialUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None


class PostCreate(PostBase):
    pass


class PostDB(PostBase):
    pass

3. 连接数据库

https://docs.mongodb.com/manual/ reference/connection-string/

# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : app.py.py
# @desc :

from typing import List, Tuple
from bson import ObjectId, errors  # BSON (Binary JSON) encoding and decoding
from fastapi import Depends, FastAPI, HTTPException, Query, status
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase

from web_python_dev.mongo_motor.models import PostDB, PostCreate, PostPartialUpdate

app = FastAPI()
motor_client = AsyncIOMotorClient(
    "mongodb://localhost:27017"
)
database = motor_client["cp6_mongodb"]  # 单个数据库实例

def get_database() -> AsyncIOMotorDatabase:
    return database

4. 插入文档

async def pagination(skip: int = Query(0, ge=0),
                     limit: int = Query(10, ge=0)) -> Tuple[int, int]:
    capped_limit = min(100, limit)
    return (skip, capped_limit)


async def get_object_id(id: str) -> ObjectId:
    try:
        return ObjectId(id)
    except (errors.InvalidId, TypeError):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)


async def get_post_or_404(
        id: ObjectId = Depends(get_object_id),
        database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:
    raw_post = await database['posts'].find_one({"_id": id})
    if raw_post is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
    return PostDB(**raw_post)
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(
        post: PostCreate, database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:
    post_db = PostDB(**post.dict())
    await database["posts"].insert_one(post_db.dict(by_alias=True))
    # by_alias=True 使用 _id 来序列化

    post_db = await get_post_or_404(post_db.id, database)

    return post_db

测试之前需要 docker 开启服务

docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4
在这里插入图片描述
在这里插入图片描述

5. 查询

@app.get("/posts")
async def list_posts(
        pagination: Tuple[int, int] = Depends(pagination),
        database: AsyncIOMotorDatabase = Depends(get_database)
) -> List[PostDB]:
    skip, limit = pagination
    query = database['posts'].find({}, skip=skip, limit=limit)
    # find 第一个参数 是过滤用的,我们要获取所有的,所以留空
    result = [PostDB(**raw_post) async for raw_post in query]
    # async 列表推导
    return result
在这里插入图片描述
在这里插入图片描述
@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:
    return post
在这里插入图片描述
在这里插入图片描述

6. 更新、删除

@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
        post_update: PostPartialUpdate,
        post: PostDB = Depends(get_post_or_404),
        database: AsyncIOMotorDatabase = Depends(get_database),
) -> PostDB:
    await database["posts"].update_one(
        {"_id": post.id}, {"$set": post_update.dict(exclude_unset=True)}
    )

    post_db = await get_post_or_404(post.id, database)

    return post_db
在这里插入图片描述
在这里插入图片描述
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
        post: PostDB = Depends(get_post_or_404),
        database: AsyncIOMotorDatabase = Depends(get_database),
):
    await database["posts"].delete_one({"_id": post.id})

7. 嵌套文档

如果我们想将 post 和 comment 一起存储

在 models.py 中添加

class CommentBase(BaseModel):
    publication_date: datetime = Field(default_factory=datetime.now)
    content: str


class CommentCreate(CommentBase):
    pass


class CommentDB(CommentBase):
    pass

class PostDB(PostBase):
    comments: List[CommentDB] = Field(default_factory=list)

现在我们在查询一下看看,发现 comments 也在结果当中

在这里插入图片描述
在这里插入图片描述

那我们要创建 comments 的内容

@app.post("/posts/{id}/comments", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment:CommentCreate,
                         post:PostDB=Depends(get_post_or_404),
                         database:AsyncIOMotorDatabase=Depends(get_database))->PostDB:
    await database['posts'].update_one(
        {'_id': post.id}, {'$push': {'comments': comment.dict()}}
    )
    # $push操作。这是向列表属性添加元素的有用运算符
    post_db = await get_post_or_404(post.id, database)
    return post_db

更多update操作:https://www.mongodb.com/docs/manual/reference/operator/update/

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 1. 安装
  • 2. 创建models
  • 3. 连接数据库
  • 4. 插入文档
  • 5. 查询
  • 6. 更新、删除
  • 7. 嵌套文档
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档