首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RxJS和:用于多个顺序请求的forkJoin

RxJS和:用于多个顺序请求的forkJoin
EN

Stack Overflow用户
提问于 2020-02-03 19:23:13
回答 2查看 2.8K关注 0票数 0

在RxJS (ES6)中,我试图在单个可观察的连续系列操作中获得结果。

我不确定我是否必须使用forkJoin (但我希望这些操作是按顺序执行)还是使用concat操作符(但我希望在所有操作都执行时在结束时得到通知)。

我试过:

forkJoin

代码语言:javascript
运行
复制
  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return forkJoin(batch);
        })
    );
  }

concat

代码语言:javascript
运行
复制
  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return concat(batch);
        })
    );
  }

在这两种情况下,我都无法看到来自HttpClient的可观测数据(没有发送http请求),但我可以看到日志记录。

批处理中调用的方法如下:

代码语言:javascript
运行
复制
  updateProduct(product: Product) {
      console.log('calling...');
      const options = this.getDefaultRequestOptions();
      return this.http.post('update_product', product, options);
  }

我调用sync()函数,如下所示:

代码语言:javascript
运行
复制
this.productService.sync().subscribe(() => {
  console.log('sync done');
}, error => this.utils.handleError(error));

输出:

代码语言:javascript
运行
复制
(8) calling...
sync done

但是没有HTTP请求启动。

如果在管道映射的forkJoin/concat之外执行相同的操作,则可以看到发送的HTTP请求。

代码语言:javascript
运行
复制
sync(): Observable<any> {

    const product = new Product(null, 'Title', 'Desc.', 'CODE');
    return this.api.updateProduct(product);

}

我遗漏了什么?

--更新--解决方案--

代码语言:javascript
运行
复制
sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        flatMap(products => {

            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            console.log(batch);

            return concat(...batch);
            // return forkJoin(batch);
        })
    );
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-02-03 20:34:10

试一试

代码语言:javascript
运行
复制
sync(): Observable<any> {
  return from(this.db.getProducts()).pipe(
    map(products => {
      const batch: Observable<any>[] = [];
      for (const product of products) {
        batch.push(this.api.updateProduct(product));
      }
     return batch;
   }),
   switchMap(batch => forkJoin(batch)),
  )
}

这是假设this.db.getProducts()是同步静态数据,而不是可观察/异步数据。

然后你可以试着做

代码语言:javascript
运行
复制
  this.productService.sync().subscribe(() => { console.log('sync done'); });

并查看是否正在进行任何API调用。

票数 2
EN

Stack Overflow用户

发布于 2020-02-03 20:30:25

这里的问题不是forkJoin/concat,而是使用map返回另一个可观察到的。

在您的代码中,map将返回Observable<Observable<any>>。您的IDE应该突出显示这些内容(不要使用any,也许这是一个问题)。

您的解决方案是将map更改为concatMap(至少用于测试),下一个链应该可以工作。

另外,调试subscribe及其返回值,以查看您拥有什么。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60045843

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档