首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >查看Python异步中的MongoDB更改流

查看Python异步中的MongoDB更改流
EN

Stack Overflow用户
提问于 2021-08-30 08:15:30
回答 1查看 1K关注 0票数 3

如何设置一个python服务,该服务(异步)监视mongodb的更改流。

我在mongodb.com[医]侏儒上所能找到的只有以下两种方法,这两种方法看上去并没有真正准备好生产:

方法mongodb.com:

代码语言:javascript
运行
复制
import os
import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('<YOUR-MONGO-CONNECT-STRING>')
change_stream = client.changestream.collection.watch()
for change in change_stream:
    print(dumps(change))

接近pymongo文档:

代码语言:javascript
运行
复制
with db.collection.watch() as stream:
    while stream.alive:
        change = stream.try_next()
        print("Current resume token: %r" % (stream.resume_token,))
        if change is not None:
            print("Change document: %r" % (change,))
            continue
        time.sleep(10)

我想到了一种解决方案,使用watch函数作为事件循环的回调。有人知道这方面的实现吗?

EN

回答 1

Stack Overflow用户

发布于 2022-06-21 11:05:47

为了获得所有MongoDB更新的通知,一种简单的方法是:

代码语言:javascript
运行
复制
import pymongo

client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
            mongo_user, mongo_pass, mongo_db_name))

 option={ 'full_document':'updateLookup' }

 change_stream = client.mongo_db_name.mongo_db_collection.watch([{"$match" : { "operationType" : "update" }}], **option)
 for change in change_stream:
     print(dumps(change))
     print('')

你需要替换

  • 使用您的MongoDB连接URL
  • mongo_user:用你的用户名
  • mongo_pass:用你的密码
  • mongo_db_name:你想看的数据库名
  • mongo_db_collection:你想看的收藏名

此外,您还可以添加一个简历令牌,以处理例如出现网络错误的情况。使用简历令牌,流将再次连接,您将从丢失连接的点接收通知。

代码语言:javascript
运行
复制
import pymongo
from pymongo import errors

client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
            mongo_user, mongo_pass, mongo_db_name))

try:
    resume_token = None
    pipeline = [{'$match': {'operationType': 'update'}}]
    with client.mongo_db_name.mongo_db_collection.watch(pipeline) as stream:
        for update_change in stream:
            print(update_change)
            resume_token = stream.resume_token
except pymongo.errors.PyMongoError:
    if resume_token is None:
       logging.error('...')
    else:
        with client.mongo_db_name.mongo_db_collection.watch(pipeline, resume_after=resume_token) as stream:
            for update_change in stream:
                print(update_change)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68980881

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档