我对我的db执行http请求,并注意到如果我同时发送所有请求,其中一些请求将得到超时错误。我想在调用之间添加一个延迟,这样服务器就不会超载。我试图找到这个问题的RxJS解决方案,并且不想添加一个setTimeout。
以下是我目前所做的工作:
let observables = [];
for(let int = 0; int < 10000; int++){
observables.push(new Observable((observer) => {
db.add(doc[int], (err, result)=>{
observer.next();
observer.complete();
})
}))
}
forkJoin(observables).subscribe(
data => {
},
error => {
console.log(error);
},
() => {
db.close();
}
);发布于 2020-11-23 10:45:53
您确实可以通过Rxjs很好地实现这一点。你需要更高的可观测值,这意味着你将发射一个可观测到的,而更高的可观测到的将为你平平这一点。
这种方法的好处是,您可以轻松地在//中运行X请求,而不必自己管理请求池。
以下是工作代码:
import { Observable, Subject } from "rxjs";
import { mergeAll, take, tap } from "rxjs/operators";
// this is just a mock to demonstrate how it'd behave if the API was
// taking 2s to reply for a call
const mockDbAddHtppCall = (id, cb) =>
setTimeout(() => {
cb(null, `some result for call "${id}"`);
}, 2000);
// I have no idea what your response type looks like so I'm assigning
// any but of course you should have your own type instead of this
type YourRequestType = any;
const NUMBER_OF_ITEMS_TO_FETCH = 10;
const calls$$ = new Subject<Observable<YourRequestType>>();
calls$$
.pipe(
mergeAll(3),
take(NUMBER_OF_ITEMS_TO_FETCH),
tap({ complete: () => console.log(`All calls are done`) })
)
.subscribe(console.log);
for (let id = 0; id < NUMBER_OF_ITEMS_TO_FETCH; id++) {
calls$$.next(
new Observable(observer => {
console.log(`Starting a request for ID "${id}""`);
mockDbAddHtppCall(id, (err, result) => {
if (err) {
observer.error(err);
} else {
observer.next(result);
observer.complete();
}
});
})
);
}和Stackblitz的现场演示:
请打开浏览器的控制台,并注意控制台日志显示触发呼叫时会立即启动其中的3个,然后等待1完成后再接另一个调用。
发布于 2020-11-23 10:22:57
看起来您可以使用初始的timer来触发http调用。例如:
timer(delayTime).pipe(combineLatest(()=>sendHttpRequest()));
这只会在可观察到的计时器完成后触发sendHttpRequest()方法。
所以用你的解决方案。你可以做以下..。
observables.push(
timer(delay + int).pipe(combineLatest(new Observable((observer) => {
db.add(doc[int], (err, result)=>{
observer.next();
observer.complete();
}))
}))延迟可能从0开始,您可以使用循环的int索引来增加延迟。
计时器文档:https://www.learnrxjs.io/learn-rxjs/operators/creation/timer
合并最新文档:https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest
发布于 2020-11-23 14:41:57
具有并发值的merge:
mergeAll和mergeMap都允许您定义订阅可观测值的最大数量。mergeAll(1)/mergeMap(LAMBDA, 1)基本上是concatAll()/concatMap(LAMBDA)。
merge基本上只是静态的mergeAll
下面是你如何使用它的方法:
let observables = [...Array(10000).keys()].map(intV =>
new Observable(observer => {
db.add(doc[intV], (err, result) => {
observer.next();
observer.complete();
});
})
);
const MAX_CONCURRENT_REQUESTS = 10;
merge(...observables, MAX_CONCURRENT_REQUESTS).subscribe({
next: data => {},
error: err => console.log(err),
complete: () => db.close()
});注意:这不会对您的呼叫进行批处理,但是它应该可以解决所描述的问题,而且可能也比批处理要快一些。
具有并发值的mergeMap:
也许使用range和mergeMap的方式稍微多一点
const MAX_CONCURRENT_REQUESTS = 10;
range(0, 10000).pipe(
mergeMap(intV =>
new Observable(observer => {
db.add(doc[intV], (err, result) => {
observer.next();
observer.complete();
});
}),
MAX_CONCURRENT_REQUESTS
)
).subscribe({
next: data => {},
error: err => console.log(err),
complete: () => db.close()
});https://stackoverflow.com/questions/64966355
复制相似问题