首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >有条件地将5-20k文档的输入批处理到一个拥有多达100万个mongodb / mongoose文档的集合中的有效方法是什么?

有条件地将5-20k文档的输入批处理到一个拥有多达100万个mongodb / mongoose文档的集合中的有效方法是什么?
EN

Stack Overflow用户
提问于 2019-06-30 12:13:39
回答 2查看 35关注 0票数 0

在我的mmo人口普查/字符统计跟踪应用程序中,我将从用户获得多达5-20k个文档的输入批次,我需要将这些文档聚合到数据库中。我需要寻找特定的标准来决定来自输入的文档是否已经存在于集合中并需要更新,或者它是否是全新的,是否需要插入到集合中。

为了使我的应用程序正常工作,这也是非常重要的,我可以准确地确定有多少文档在输入处理后被更新和/或直接添加。

为了更好地解释我想做什么,我把它分解成一个简化的例子,在这个例子中,我可以向您展示输入是什么样子的,以及需要什么样的结果。

作为以下输入案例的起点,集合是这样的:

代码语言:javascript
运行
复制
collection = [
  { name: 'Jean', server: 'Alpha', level: 9 },
  { name: 'Anna', server: 'Beta', level: 17 },
  { name: 'Jean', server: 'Beta', level: 10 }
];

因此,基本上有3个输入案例,我需要涵盖。

案例#1

当我获得一个带有全新name+server组合的输入时,应该向集合中添加一个新文档

代码语言:javascript
运行
复制
input = { name: 'Victor', server: 'Alpha', level: 22 };

应成为:

代码语言:javascript
运行
复制
collection = [
  { name: 'Jean', server: 'Alpha', level: 9 },
  { name: 'Anna', server: 'Beta', level: 17 },
  { name: 'Jean', server: 'Beta', level: 10 },
  { name: 'Victor', server: 'Alpha', level: 22 }
];

案例#2

当我使用现有的name+server组合获得输入,但使用更高的level时,应该更新现有文档

代码语言:javascript
运行
复制
input = { name: 'Jean', server: 'Alpha', level: 10 };

应该变成

代码语言:javascript
运行
复制
collection = [
  { name: 'Jean', server: 'Alpha', level: 10 },
  { name: 'Anna', server: 'Beta', level: 17 },
  { name: 'Jean', server: 'Beta', level: 10 }
];

案例#3

当我使用现有的name+server组合获得输入,但使用相等或更低的level时,不应该发生任何事情,集合应该保持原样。

代码语言:javascript
运行
复制
input = { name: 'Jean', server: 'Alpha', level: 9 };

代码语言:javascript
运行
复制
input = { name: 'Jean', server: 'Alpha', level: 8 };

应留在:

代码语言:javascript
运行
复制
collection = [
  { name: 'Jean', server: 'Alpha', level: 9 },
  { name: 'Anna', server: 'Beta', level: 17 },
  { name: 'Jean', server: 'Beta', level: 10 }
];

到目前为止,我所做的基本上是将整个集合提取到一个数组中,然后使用Array.filter查找集合中已经存在的输入,并使用findOneAndUpdate更新它们,以及哪些输入是新的,然后使用insertMany将它们插入集合中

代码语言:javascript
运行
复制
Test.find({}, async (err, documents) => {
  if (err) return console.log(err);
  if (documents.length > 0) {
    const changedInputs = inputs.filter(byChanged(documents));
    const newInputs = inputs.filter(byNew(documents));

    const insertResult = await Test.insertMany(newInputs);
    const inserted = insertResult.length;

    const updateResults = await Promise.all(compileUpdatePromises(changedInputs));
    let updated = 0;
    updateResults.forEach(updateResult => {
      updated = updateResult === 'updated' ? updated + 1 : updated;
    });

    console.log('updated:', updated);
    console.log('inserted:', inserted);
  }
});

链接到包含整个示例的gist

当集合中没有多少文档时,这很好,但是现在它已经发展为50k+文档,它变得异常缓慢,并且在这个过程中阻塞了mongo连接,什么也阻止了所有其他调用的整个api。

一旦这个应用程序获得了更多的流量,它就有可能迅速成长为上百万个文档的集合,然后不断地更新这些文档。

有什么更简单、更有效的方法让mongodb为我做所有的艰苦工作,而不是独自完成呢?

更新1:

有了、simagix、发黑的建议,我非常接近于一个解决方案。这就是我修改过的代码现在的样子:

代码语言:javascript
运行
复制
const bulkInput = inputs.map(input => ({
  updateOne: {
    filter: { name: input.name, server: input.server, level: { $lte: input.level } },
    upsert: true,
    update: { $set: { name: input.name, server: input.server, level: input.level } }
  }
}));

Test.bulkWrite(bulkInput).then(result => {
  console.log('inserted:', result.nUpserted, 'updated:', result.nModified);
});

这个问题现在是案例#3的第二个例子。

代码语言:javascript
运行
复制
input = { name: 'Jean', server: 'Alpha', level: 8 };

在以下方面的成果:

代码语言:javascript
运行
复制
collection = [
  { name: 'Jean', server: 'Alpha', level: 9 },
  { name: 'Anna', server: 'Beta', level: 17 },
  { name: 'Jean', server: 'Beta', level: 10 },
  { name: 'Jean', server: 'Alpha', level: 8 }
];

链接到更新的gist

更新2:

只是需要使复合索引

代码语言:javascript
运行
复制
testSchema.index({ name: 1, server: 1 });

为唯一的复合索引

代码语言:javascript
运行
复制
testSchema.index({ name: 1, server: 1 }, { unique: true });

现在,我必须找到一个正确的解决方案来处理E11000 duplicate key error异常,即为案例3抛出的示例2

链接到更新的gist

EN

回答 2

Stack Overflow用户

发布于 2019-06-30 16:42:14

首先,建立综合指数。https://docs.mongodb.com/manual/core/index-compound/

在mongodb和猫鼬上都有。

第二,请写出适当的检索查询。当索引支持时,$or (https://docs.mongodb.com/manual/reference/operator/query/or/)是O(k log n),其中k是匹配项的数目。

或者,尝试大容量操作。https://docs.mongodb.com/manual/reference/method/Bulk/

它可以返回成功查找/更新的次数。https://docs.mongodb.com/manual/reference/method/BulkWriteResult/。添加一个额外的字段以查找级别:{ $lt: currlvl },以只进行有条件的更新。我不太清楚如何把这个和上流社会结合起来。

最后,如果我是您,我将在服务器上进行散列/连接,并将其命名为id。会让生活变得轻松多了。

票数 1
EN

Stack Overflow用户

发布于 2019-06-30 17:16:03

从您的简化示例中,名称和服务器的组合是唯一的。您可以在{name: 1, server: 1}上创建唯一的索引。使用updateOne函数更新并将upsert标志设置为true,以便在不存在文档的情况下插入文档。下面是来自mongo的命令,向您展示它是如何工作的。

代码语言:javascript
运行
复制
db.records.drop()

db.records.createIndex({name:1, server:1})

db.records.insertMany([     
    { name: 'Jean', server: 'Alpha', level: 9 },        
    { name: 'Anna', server: 'Beta', level: 17 },        
    { name: 'Jean', server: 'Beta', level: 10 }  ])

db.records.find({}, {_id: 0})

db.records.updateOne(
    { name: 'Victor', server: 'Alpha', level: {$lte: 22} },     
    {$set: {name: 'Victor', server: 'Alpha', level: 22 }},      
    {upsert: true})

db.records.find({}, {_id: 0})

db.records.updateOne(
    { name: 'Jean', server: 'Alpha', level: {$lte: 9} }, 
    {$set: {name: 'Jean', server: 'Alpha', level: 9}}, 
    {upsert: 1})

db.records.find({}, {_id: 0})

db.records.updateOne(
    { name: 'Jean', server: 'Alpha', level: {$lte: 10} }, 
    {$set: {name: 'Jean', server: 'Alpha', level: 10 }}, 
    {upsert: 1})

db.records.find({}, {_id: 0})
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56824760

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档