我有一个包含170万条记录的mongodb集合。每条记录都是一个ID号。我需要读取每个ID号,对另一个服务执行一些请求,转换数据,将其写入不同的集合,如果所有操作都成功,则删除原始ID记录。
我想要一个脚本,它可以无限期地做这些事情,直到集合为空,具有指定的并发性(即任何时候最多3个请求)。
我想要的本质上是一个并发的while循环,即:(伪javascript)
promiseWhile(queueNotEmpty, 3){
readFromQueue
.then(doc => {
return process(doc);
})
.then(result => {
if(result == "empty") // (or whatever)
queueNotEmpty = false;
});
}
发布于 2019-05-23 04:32:27
您可以使用mongodb的游标来异步迭代所有记录。为了让三个工作者处理它,将任务包装到一个异步函数中,并多次调用该函数:
const cursor = db.collection("records").find({});
async function process() {
while(await cursor.hasNext()) {
const record = await cursor.next();
//...
}
}
await Promise.all([ process(), process(), process() ]);
(不过,我不确定mongodb驱动程序是否支持对.next()
的并发调用,您应该测试一下)
否则,此信号量实现可能会有所帮助:
function Semaphore(count = 1) {
const resolvers = [];
let startCount = count;
return {
aquire() {
return new Promise(resolve => {
if(startCount) { resolve(); startCount -= 1; }
else resolvers.push(resolve);
});
},
free() {
if(resolvers.length) resolvers.pop()();
else startCount += 1;
},
async use(cb) {
await this.aquire();
await cb();
this.free()
},
async done() {
await Promise.all(Array.from({ length: count }, () => this.aquire()));
startCount = count;
},
};
}
在您的情况下,Running Demo可用作:
const connectionSemaphore = Semaphore(3);
(async fuction() {
while(await cursor.hasNext()) {
const record = await cursor.next();
/*await*/ connectionSemaphore.use(async () => {
// Do connection stuff concurrently
});
}
await connectionSemaphore.done();
})();
https://stackoverflow.com/questions/56263994
复制相似问题