问题:我需要用简单的XHR请求下载大文件(5-10 be ),为了简化我需要做的一个问题-至少: 100个请求(可能更多)。我想使用RXJS (并行http请求)来提高它的性能。
问题:
堆栈:
我真正想要的是:我真正需要的是在mergeMap
上应用同步来触发它的开/关。例如,我有100个one,但我不希望它们运行到最后一个。
例如,我希望它运行到17日,但会暂停发出请求,并等待特定的逻辑和请求完成。
代码示例:
from(observableQueries)
.pipe(
mergeMap(partialQuery => this.messageService.getResults(partialQuery, true), 4),
takeWhile(o => {
if (!o.isMoreResults && o.auditMessageList.length === 0) {
this.logger.log(`First page that returns false ${ o.number }`);
this.logger.log(`Count of responses that exists in array: ${ allResults.length }`);
if (!firstPageThatShouldBeStopped) {
firstPageThatShouldBeStopped = o.number;
}
if (allResults.length === firstPageThatShouldBeStopped) {
return false;
}
}
return true;
}),
retryWhen(genericRetryStrategy()),
catchError((err, caught) => {
this._alertService.error(this._translate.instant('EXPORT_TO_CSV_DOWNLOAD_ERROR'));
return throwError(err);
})
)
.subscribe(
(res: MessagesResult) => {
if (reThrowError) {
this.logger.info('Error will be thrown for e2e purposes');
throw new Error('Error will throw for e2e purposes');
}
if (res.isMoreResults && res.auditMessageList.length > 0) {
allResults[res.number] = res;
this.subject.next(true);
} else if (!res.isMoreResults && res.auditMessageList.length > 0) {
allResults[res.number] = res;
this.subject.next(true);
} else {
this.subject.next(false);
}
},
err => {
// clear subject after emitting value...
this.subject.next(true);
return this.handleError(err);
},
() => {
this.logger.info('Download file finished...');
this.logger.info('Time consumed: ', performance.now() - start);
try {
this.logger.info(`Count Responses: ${ allResults.length } `);
const allResultSorted = this._sortResults(allResults);
let counter = 0;
for (let i = 0; i < allResultSorted.length; i++) {
this.logger.info('Index: ' + i, allResultSorted[i]);
counter += allResultSorted[i].auditMessageList.length;
this.logger.info('Length OF Messages: ' + i, counter);
this.fileSaver.save(!allResultSorted[i].isMoreResults, allResultSorted[i].auditMessageList,
`audit-events_${ LOCAL_QUERY_COPY.application }_${ timestamp }_${ moment()
.tz(this._timezoneService.timezone).zoneName() }.csv`, null, headers);
}
this.subject.next(false);
} catch (e) {
this._alertService.error(this._translate.instant('EXPORT_TO_CSV_DOWNLOAD_ERROR'));
return this.handleError(e);
} finally {
// clear subject after emitting value...
this.subject.next(true);
}
}
);
代码起作用了!但问题出在冗余呼叫上。如何才能让em停止,直到最后一个请求运行完毕。
发布于 2019-06-12 10:42:12
const urls=[url1,url2,url3]
const concurrency=5
const fetchUrls=urls.map(url=>defer(_=>fetch(url)))
merge(...fetchUrls,concurrency).subscribe(console.log)
发布于 2019-05-23 03:48:33
将请求并行化与对请求循环的每个迭代使用一个请求不是相同的吗?带宽是一样的-不
我会说不,因为在服务器上,当一个线程执行第一个请求时,第二个线程可以为我下载第二个请求=>,最好是并行化,但也许我错了
发布于 2019-05-28 00:39:22
您可以使用RXJS 'merge‘操作符:假设getData(url)是发出请求的方法,并且此方法返回一个可观察对象,您可以这样做:
const urls: string[] = {url1, url2, url3,...};
let mergeHttpCallObservalbe: Observable<any>;
urls.forEach((url: string) => {
const newHttpCallObservalbe : Observable<any> = myService.getData(url);
if (mergeHttpCallObservalbe){
mergeHttpCallObservalbe = merge(mergeHttpCallObservalbe, newHttpCallObservalbe);
} else {
mergeHttpCallObservalbe = newHttpCallObservalbe;
}
});
// Now you have merged all your Observable, you can subscribe:
mergeHttpCallObservalbe.subscribe(result => {
// Do your stuff...
});
这里有一篇关于Rxjs运算符的好文章:https://blog.angularindepth.com/learn-to-combine-rxjs-sequences-with-super-intuitive-interactive-diagrams-20fce8e6511
希望能有所帮助
https://stackoverflow.com/questions/56260967
复制相似问题