如何设置一个python服务,该服务(异步)监视mongodb的更改流。
我在mongodb.com和[医]侏儒上所能找到的只有以下两种方法,这两种方法看上去并没有真正准备好生产:
方法mongodb.com:
import os
import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('<YOUR-MONGO-CONNECT-STRING>')
change_stream = client.changestream.collection.watch()
for change in change_stream:
print(dumps(change))
接近pymongo文档:
with db.collection.watch() as stream:
while stream.alive:
change = stream.try_next()
print("Current resume token: %r" % (stream.resume_token,))
if change is not None:
print("Change document: %r" % (change,))
continue
time.sleep(10)
我想到了一种解决方案,使用watch函数作为事件循环的回调。有人知道这方面的实现吗?
发布于 2022-06-21 11:05:47
为了获得所有MongoDB更新的通知,一种简单的方法是:
import pymongo
client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
mongo_user, mongo_pass, mongo_db_name))
option={ 'full_document':'updateLookup' }
change_stream = client.mongo_db_name.mongo_db_collection.watch([{"$match" : { "operationType" : "update" }}], **option)
for change in change_stream:
print(dumps(change))
print('')
你需要替换
此外,您还可以添加一个简历令牌,以处理例如出现网络错误的情况。使用简历令牌,流将再次连接,您将从丢失连接的点接收通知。
import pymongo
from pymongo import errors
client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
mongo_user, mongo_pass, mongo_db_name))
try:
resume_token = None
pipeline = [{'$match': {'operationType': 'update'}}]
with client.mongo_db_name.mongo_db_collection.watch(pipeline) as stream:
for update_change in stream:
print(update_change)
resume_token = stream.resume_token
except pymongo.errors.PyMongoError:
if resume_token is None:
logging.error('...')
else:
with client.mongo_db_name.mongo_db_collection.watch(pipeline, resume_after=resume_token) as stream:
for update_change in stream:
print(update_change)
https://stackoverflow.com/questions/68980881
复制相似问题