一、开始
mongo
特别适合存储各种嵌套及不能确定格式的数据,而mongo
自带的去重功能(使用 _id
唯一键支持)又特别适合小爬虫存储数据。多数情况会出现数据更新的操作, 但又不知道是不是存在, 是使用insert
还是update
。看到最后就知道了, 还可以存在则更新部分字段, 不存在则插入。废话不多说, 开干。
datas = [
{
"_id": 1,
"insert_time": time.time(),
"update_time": time.time(),
"name": "zs"
},
{
"_id": 2,
"insert_time": time.time(),
"update_time": time.time(),
"name": "ls"
},
{
"_id": 3,
"insert_time": time.time(),
"update_time": time.time(),
"name": "ww"
},
]
以下代码实现:
1、实现存在更新不存在则插入
2、实现存在跳过不存在则插入
3、实现存在更新部分字段不存在则插入
就不分开写了, 直接放在一个源文件里了, 最后有测试用例
# -*- coding: utf-8 -*-
# @Author: 胖胖很瘦
# @Date: 2024-03-03 12:58:42
# @LastEditors: 胖胖很瘦
# @LastEditTime: 2024-03-03 13:57:49
# 导入包
import time
from pymongo import MongoClient as MC
from pymongo import UpdateOne
client = MC()["test"]
def exists_update_and_insert(data, bulk=False):
"""
存在则更新
不存在则插入
:param data: 数据
:param bulk: 是否使用批量插入
# ordered
# 有序执行, 一条报错, 后面不再执行
# 无序执行, 一条报错, 其它不受影响
"""
if bulk:
bulk_docs = []
for rd in data:
bulk_docs.append(
UpdateOne(
{"_id": rd["_id"]},
{"$set": rd},
upsert=True
))
result = client['goods'].bulk_write(bulk_docs, ordered=False).bulk_api_result
print(result)
else:
for rd in data:
client['goods'].update_one(
{'_id': rd['_id']},
{'$set': rd},
upsert=True
)
def exists_do_noting_and_insert(data, bulk=False):
"""
存在则不做任何操作
不存在则插入
:param data: 数据
:param bulk: 是否使用批量插入
"""
if bulk:
bulk_docs = []
for rd in data:
bulk_docs.append(
UpdateOne(
{"_id": rd["_id"]},
{"$setOnInsert": rd},
upsert=True
))
result = client['goods'].bulk_write(bulk_docs, ordered=False).bulk_api_result
print(result)
else:
for rd in data:
client['goods'].update_one(
{'_id': rd['_id']},
{'$setOnInsert': rd},
upsert=True
)
def exists_update_any_field_and_insert(data, bulk=False):
"""
存在则更新部分字段
不存在则插入
:param data: 数据
:param bulk: 是否使用批量插入
"""
if bulk:
bulk_docs = []
for rd in data:
update_time = rd.pop("update_time", time.time())
bulk_docs.append(
UpdateOne(
{"_id": rd["_id"]},
{
"$setOnInsert": rd,
"$set": {"update_time": update_time}
},
upsert=True
))
result = client['goods'].bulk_write(bulk_docs, ordered=False).bulk_api_result
print(result)
else:
for rd in data:
update_time = rd.pop("update_time", time.time())
client['goods'].update_one(
{'_id': rd['_id']},
{
'$setOnInsert': rd,
"$set": {"update_time": update_time}
},
upsert=True
)
def find(query=None):
"""
查询数据
:query: 查询条件
"""
return list(client['goods'].find(query))
def remove(query=None):
"""
删除数据
:query: 删除条件
"""
return client["goods"].remove(query)
if __name__ == '__main__':
# 测试数据
datas = [
{
"_id": 1,
"insert_time": time.time(),
"update_time": time.time(),
"name": "zs"
},
{
"_id": 2,
"insert_time": time.time(),
"update_time": time.time(),
"name": "ls"
},
{
"_id": 3,
"insert_time": time.time(),
"update_time": time.time(),
"name": "ww"
},
]
"""
测试用例
"""
# exists_update_and_insert(datas)
# exists_update_and_insert(datas, True)
# print(find())
# remove()
# exists_do_noting_and_insert(datas)
# exists_do_noting_and_insert(datas, True)
# print(find())
# remove()
# exists_update_any_field_and_insert(datas)
# exists_update_any_field_and_insert(datas, True)
# print(find())
# remove()
文字虽然少, 但是干货满满。周末快乐, 留下你的点赞吧~