在RxJS (ES6)中,我试图在单个可观察的连续系列操作中获得结果。
我不确定我是否必须使用forkJoin (但我希望这些操作是按顺序执行)还是使用concat操作符(但我希望在所有操作都执行时在结束时得到通知)。
我试过:
forkJoin
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return forkJoin(batch);
})
);
}
concat
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return concat(batch);
})
);
}
在这两种情况下,我都无法看到来自HttpClient
的可观测数据(没有发送http请求),但我可以看到日志记录。
批处理中调用的方法如下:
updateProduct(product: Product) {
console.log('calling...');
const options = this.getDefaultRequestOptions();
return this.http.post('update_product', product, options);
}
我调用sync()函数,如下所示:
this.productService.sync().subscribe(() => {
console.log('sync done');
}, error => this.utils.handleError(error));
输出:
(8) calling...
sync done
但是没有HTTP请求启动。
如果在管道映射的forkJoin/concat之外执行相同的操作,则可以看到发送的HTTP请求。
sync(): Observable<any> {
const product = new Product(null, 'Title', 'Desc.', 'CODE');
return this.api.updateProduct(product);
}
我遗漏了什么?
--更新--解决方案--
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
flatMap(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
console.log(batch);
return concat(...batch);
// return forkJoin(batch);
})
);
发布于 2020-02-03 20:34:10
试一试
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
const batch: Observable<any>[] = [];
for (const product of products) {
batch.push(this.api.updateProduct(product));
}
return batch;
}),
switchMap(batch => forkJoin(batch)),
)
}
这是假设this.db.getProducts()
是同步静态数据,而不是可观察/异步数据。
然后你可以试着做
this.productService.sync().subscribe(() => { console.log('sync done'); });
并查看是否正在进行任何API调用。
发布于 2020-02-03 20:30:25
这里的问题不是forkJoin/concat
,而是使用map
返回另一个可观察到的。
在您的代码中,map
将返回Observable<Observable<any>>
。您的IDE应该突出显示这些内容(不要使用any
,也许这是一个问题)。
您的解决方案是将map
更改为concatMap
(至少用于测试),下一个链应该可以工作。
另外,调试subscribe
及其返回值,以查看您拥有什么。
https://stackoverflow.com/questions/60045843
复制相似问题