RxJS:使用merge动态添加和删除Observable

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (17)

我正在尝试构建一个可插拔的ProgressService

该服务应该跟踪当前加载的“事物”数量。并有一个isLoading()方法返回一个Observable<boolean>指示是否有任何加载。

我的第一个解决方案非常天真,使用a new BehaviorSubject(0)然后每个加载提供程序只是调用ProgressService.increase()ProgressService.decrease()。这很好。但是如果可能的话,现在我想更加反应。

然后我遇到了merge,当一开始就知道所有加载提供程序时,它工作正常:

this.progress = merge(...progressProviders).pipe(
  scan((acc, curr) => acc + (curr ? 1 : -1), 0)
);

progress当任何加载提供程序发出true或时,这将简单地增加/减少该值false

但我还需要某种注册/取消注册功能。这应该基本上添加一个新Observablemerge链(或删除它)。

新的ProgressService应该是这样的:

class ProgressService {

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
  }

  isLoading(): Observable<boolean> {
    return this.progress.pipe(
      map(val => val !== 0)
    );
  }

}

removeLoadingProvider如果我们返回一个Subscriptionfrom addLoadingProvider,并且用于Subscription.unsubscribe()取消注册,那么根本不需要该方法(?)。

希望有人可以根据需要告诉我如何mergeunmerge其他Observables。

提问于
用户回答回答于

根据你的解释,我可以理解以下[请纠正我如果我的理解不正确] -

你想收集发出布尔值的各种observable,并且你希望以这种方式跟踪它们中的任何一个至少有一个,"true"那么你的最终observable应该发出true,否则最终的observable应该返回"false"

虽然您的BehaviorSubject方法是一种被动方法。我提出以下方法; 如果在您的方案中有意义,请告诉我 -

方法1 -

enum ListAction {
  Added,
  Removed,
  Empty,
  Undefined
}

export class ProgressService {

  constructor() { }

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  obsListChanged: BehaviorSubject<ListAction> = new BehaviorSubject<any>(ListAction.Undefined);
  obsList: Array<Observable<boolean>> = [];

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
    this.obsList.push(lp);

    this.obsListChanged.next(ListAction.Added);
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
    this.obsList = this.obsList.filter(i => i !== lp);
    this.obsListChanged.next(ListAction.Removed);
  }

  isLoading(): Observable<boolean> {
    // return this.progress.pipe(
    //   map(val => val !== 0)
    // );
    return this.obsListChanged.pipe(
      switchMap(() => {
        return combineLatest(this.obsList);
      }),
      map(v => {
        return v.some(loading => loading);
      })
    );
  }
}

我已经定义了一个ListAction observable,如果你想根据你的ListAction做特定的工作,那么你可以根据你的逻辑在rxjs运算符中做同样的事情。

方法2 [方法1的一点改进版] -

export class ProgressService {

  constructor() { }

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  obsList$: BehaviorSubject<Array<Observable<boolean>>> = new BehaviorSubject<Array<Observable<boolean>>>([]);

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
    this.obsList$.next([...this.obsList$.getValue(), lp]);
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
    const removed = this.obsList$.getValue().filter(i => i !== lp);
    this.obsList$.next(removed);
  }

  isLoading(): Observable<boolean> {
    // return this.progress.pipe(
    //   map(val => val !== 0)
    // );
    return this.obsList$.pipe(
      switchMap(obs => {
        return combineLatest(obs);
      }),
      map(v => {
        return v.some(loading => loading);
      })
    );
  }
}

扫码关注云+社区

领取腾讯云代金券