首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Django batching/bulk update_or_create?

Django batching/bulk update_or_create?
EN

Stack Overflow用户
提问于 2014-11-21 03:12:15
回答 3查看 21.1K关注 0票数 29

我在数据库中有需要定期更新的数据。数据源将返回该时间点可用的所有数据,因此将包括数据库中尚未包含的新数据。

在对源数据进行循环时,如果可能的话,我不想进行数千次单独的写入。

有没有像update_or_create这样的东西可以批量工作?

一种想法是将update_or_create与手动事务结合使用,但我不确定这是将单个写操作排入队列,还是将所有写操作合并到一个SQL insert中?

或者类似地,在循环中使用update_or_create的函数上使用@commit_on_success()也可以吗?

除了转换数据并将其保存到模型之外,我不会对数据执行任何操作。在循环期间,没有任何东西依赖于该模型

EN

回答 3

Stack Overflow用户

发布于 2021-02-23 03:51:07

由于Django添加了对bulk_update的支持,这在某种程度上是可能的,尽管您需要在每个批处理中执行3个数据库调用( get、批量创建和批量更新)。在这里创建一个通用函数的良好接口有点困难,因为您希望该函数既支持有效的查询,又支持更新。下面是我为批量update_or_create设计的一个方法,其中有许多公共标识键(可以是空的)和一个标识键,这些键在批处理中有所不同。

这是作为基础模型上的方法实现的,但可以独立于此使用。这还假设基本模型在名为updated_on的模型上有一个auto_now时间戳;如果不是这样,为了便于修改,假定这一点的代码行已被注释。

为了批量使用它,请在调用它之前将您的更新分成批处理。这也是一种绕过数据的方法,这些数据可以具有为数不多的次要标识符的值之一,而不必更改接口。

代码语言:javascript
运行
复制
class BaseModel(models.Model):
    updated_on = models.DateTimeField(auto_now=True)
    
    @classmethod
    def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
        """
        common_keys: {field_name: field_value}
        unique_key_name: field_name
        unique_key_to_defaults: {field_value: {field_name: field_value}}
        
        ex. Event.bulk_update_or_create(
            {"organization": organization}, "external_id", {1234: {"started": True}}
        )
        """
        with transaction.atomic():
            filter_kwargs = dict(common_keys)
            filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
            existing_objs = {
                getattr(obj, unique_key_name): obj
                for obj in cls.objects.filter(**filter_kwargs).select_for_update()
            }
            
            create_data = {
                k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
            }
            for unique_key_value, obj in create_data.items():
                obj[unique_key_name] = unique_key_value
                obj.update(common_keys)
            creates = [cls(**obj_data) for obj_data in create_data.values()]
            if creates:
                cls.objects.bulk_create(creates)

            # This set should contain the name of the `auto_now` field of the model
            update_fields = {"updated_on"}
            updates = []
            for key, obj in existing_objs.items():
                obj.update(unique_key_to_defaults[key], save=False)
                update_fields.update(unique_key_to_defaults[key].keys())
                updates.append(obj)
            if existing_objs:
                cls.objects.bulk_update(updates, update_fields)
        return len(creates), len(updates)

    def update(self, update_dict=None, save=True, **kwargs):
        """ Helper method to update objects """
        if not update_dict:
            update_dict = kwargs
        # This set should contain the name of the `auto_now` field of the model
        update_fields = {"updated_on"}
        for k, v in update_dict.items():
            setattr(self, k, v)
            update_fields.add(k)
        if save:
            self.save(update_fields=update_fields)

示例用法:

代码语言:javascript
运行
复制
class Event(BaseModel):
    organization = models.ForeignKey(Organization)
    external_id = models.IntegerField()
    started = models.BooleanField()


organization = Organization.objects.get(...)
updates_by_external_id = {
    1234: {"started": True},
    2345: {"started": True},
    3456: {"started": False},
}
Event.bulk_update_or_create(
    {"organization": organization}, "external_id", updates_by_external_id
)
票数 6
EN

Stack Overflow用户

发布于 2016-04-22 04:21:29

批量更新将是一个upsert命令,就像@imposeren所说的,Postgres 9.5为你提供了这种能力。我认为MySQL5.7也是如此(参见http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html),这取决于你的确切需求。也就是说,使用db游标可能是最简单的。这没有错,当ORM还不够时,它就在那里。

沿着这些思路的东西应该是有效的。这是一个伪装的代码,所以不要简单的剪切粘贴,但是这个概念已经为你准备好了。

代码语言:javascript
运行
复制
class GroupByChunk(object):
    def __init__(self, size):
        self.count = 0
        self.size = size
        self.toggle = False

    def __call__(self, *args, **kwargs):
        if self.count >= self.size:  # Allows for size 0
            self.toggle = not self.toggle
            self.count = 0
        self.count += 1
        return self.toggle

def batch_update(db_results, upsert_sql):
    with transaction.atomic():
        cursor = connection.cursor()   
        for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
            cursor.execute_many(upsert_sql, chunk)

这里的假设是:

  • db_results是某种类型的结果迭代器,无论是在列表中还是在字典中
  • db_results的结果都可以直接输入到原始的sql exec语句
  • 如果任何批处理更新失败,您将回滚所有这些更新。如果您想将其移动到每个块,只需将with块向下推一点

即可

票数 3
EN

Stack Overflow用户

发布于 2022-02-09 13:25:23

有一个用于Django的django-bulk-update-or-create库可以做到这一点。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27047630

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档