我在数据库中有需要定期更新的数据。数据源将返回该时间点可用的所有数据,因此将包括数据库中尚未包含的新数据。
在对源数据进行循环时,如果可能的话,我不想进行数千次单独的写入。
有没有像update_or_create
这样的东西可以批量工作?
一种想法是将update_or_create
与手动事务结合使用,但我不确定这是将单个写操作排入队列,还是将所有写操作合并到一个SQL insert中?
或者类似地,在循环中使用update_or_create
的函数上使用@commit_on_success()
也可以吗?
除了转换数据并将其保存到模型之外,我不会对数据执行任何操作。在循环期间,没有任何东西依赖于该模型
发布于 2021-02-23 03:51:07
由于Django添加了对bulk_update的支持,这在某种程度上是可能的,尽管您需要在每个批处理中执行3个数据库调用( get、批量创建和批量更新)。在这里创建一个通用函数的良好接口有点困难,因为您希望该函数既支持有效的查询,又支持更新。下面是我为批量update_or_create设计的一个方法,其中有许多公共标识键(可以是空的)和一个标识键,这些键在批处理中有所不同。
这是作为基础模型上的方法实现的,但可以独立于此使用。这还假设基本模型在名为updated_on
的模型上有一个auto_now
时间戳;如果不是这样,为了便于修改,假定这一点的代码行已被注释。
为了批量使用它,请在调用它之前将您的更新分成批处理。这也是一种绕过数据的方法,这些数据可以具有为数不多的次要标识符的值之一,而不必更改接口。
class BaseModel(models.Model):
updated_on = models.DateTimeField(auto_now=True)
@classmethod
def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
"""
common_keys: {field_name: field_value}
unique_key_name: field_name
unique_key_to_defaults: {field_value: {field_name: field_value}}
ex. Event.bulk_update_or_create(
{"organization": organization}, "external_id", {1234: {"started": True}}
)
"""
with transaction.atomic():
filter_kwargs = dict(common_keys)
filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
existing_objs = {
getattr(obj, unique_key_name): obj
for obj in cls.objects.filter(**filter_kwargs).select_for_update()
}
create_data = {
k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
}
for unique_key_value, obj in create_data.items():
obj[unique_key_name] = unique_key_value
obj.update(common_keys)
creates = [cls(**obj_data) for obj_data in create_data.values()]
if creates:
cls.objects.bulk_create(creates)
# This set should contain the name of the `auto_now` field of the model
update_fields = {"updated_on"}
updates = []
for key, obj in existing_objs.items():
obj.update(unique_key_to_defaults[key], save=False)
update_fields.update(unique_key_to_defaults[key].keys())
updates.append(obj)
if existing_objs:
cls.objects.bulk_update(updates, update_fields)
return len(creates), len(updates)
def update(self, update_dict=None, save=True, **kwargs):
""" Helper method to update objects """
if not update_dict:
update_dict = kwargs
# This set should contain the name of the `auto_now` field of the model
update_fields = {"updated_on"}
for k, v in update_dict.items():
setattr(self, k, v)
update_fields.add(k)
if save:
self.save(update_fields=update_fields)
示例用法:
class Event(BaseModel):
organization = models.ForeignKey(Organization)
external_id = models.IntegerField()
started = models.BooleanField()
organization = Organization.objects.get(...)
updates_by_external_id = {
1234: {"started": True},
2345: {"started": True},
3456: {"started": False},
}
Event.bulk_update_or_create(
{"organization": organization}, "external_id", updates_by_external_id
)
发布于 2016-04-22 04:21:29
批量更新将是一个upsert命令,就像@imposeren所说的,Postgres 9.5为你提供了这种能力。我认为MySQL5.7也是如此(参见http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html),这取决于你的确切需求。也就是说,使用db游标可能是最简单的。这没有错,当ORM还不够时,它就在那里。
沿着这些思路的东西应该是有效的。这是一个伪装的代码,所以不要简单的剪切粘贴,但是这个概念已经为你准备好了。
class GroupByChunk(object):
def __init__(self, size):
self.count = 0
self.size = size
self.toggle = False
def __call__(self, *args, **kwargs):
if self.count >= self.size: # Allows for size 0
self.toggle = not self.toggle
self.count = 0
self.count += 1
return self.toggle
def batch_update(db_results, upsert_sql):
with transaction.atomic():
cursor = connection.cursor()
for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
cursor.execute_many(upsert_sql, chunk)
这里的假设是:
db_results
是某种类型的结果迭代器,无论是在列表中还是在字典中db_results
的结果都可以直接输入到原始的sql exec语句with
块向下推一点即可
发布于 2022-02-09 13:25:23
有一个用于Django的django-bulk-update-or-create库可以做到这一点。
https://stackoverflow.com/questions/27047630
复制相似问题