首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Observable<Observable<Event>[]> to Observable<Event>[]

Observable<Observable<Event>[]> to Observable<Event>[]
EN

Stack Overflow用户
提问于 2020-05-10 13:09:42
回答 2查看 120关注 0票数 0

我想要创建一个函数,它将从一个fromEvent数组中创建的所有datachannels合并起来,并发出从其中任何一个接收到的任何事件。

这是最初的功能:

代码语言:javascript
运行
复制
  private dataChannels: BehaviorSubject<RTCDataChannel[]> = new BehaviorSubject([]);
  // datachannels array get filled at some point
  ...

  public on(): Observable<Event> {
    const eventStreams = this.dataChannels.value.map((c) => fromEvent(c, 'message'));
    return merge(...eventStreams);
  }

然而,上面的函数的问题是,据我所知,将使用而不是的新值发送给datachannels。

因此,我开始创建以下函数:

代码语言:javascript
运行
复制
  const allEvents = this.dataChannels.pipe(map((channels) => channels.map((c) => fromEvent(c, 'message'))));

  return merge(...allEvents);

这就是我的问题所在。allEventsObservable<Observable<Event>[]>型的。merge不会接受的。如何解析可观察到的输入Observable<Event>[]

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-05-10 13:49:36

您可以尝试合并在BehaviorSubject中接收到的所有可观察到的数据。我看到的问题是,即使频道已从dataChannels中删除,它们仍将被挂起订阅。我已经创建了一个类支持newSubscription$,当发出这个选项时,所有的fromEvent都会取消订阅。

你可以试试这个解决方案。

代码语言:javascript
运行
复制
    newSubscription$ = new Subject();
    this.dataChannels.pipe(
        tap(() => {
            this.newSubscription$.next();   // kill existing subscriptions
            this.newSubscription$ = new Subject(); // and wait for this for new ones
        }) 
        switchMap((allChannels) => {
        const fromEvenObs = allChannels.map(c => fromEvent(c, 'message').pipe(takeUntil(newSubscription$)));
        return merge(...fromEvenObs);
    })).subscribe((data) => { // data from any event});

编辑:

由于我们使用的是switchMap(),所以在events接收到新数据时,不需要有额外的可观察性来取消订阅events。仅做以下工作就足够了:

代码语言:javascript
运行
复制
this.dataChannels.pipe(
    switchMap((allChannels) => {
        const fromEvenObs = allChannels.map(c => fromEvent(c, 'message'));
        return merge(...fromEvenObs);
})).subscribe((data) => { //data from events });
票数 1
EN

Stack Overflow用户

发布于 2020-05-10 13:45:43

您可以使用操作符mergeAll,正如文档中所说的mergeAll subscribes to an Observable that emits Observables. Each time it observes one of these emitted inner Observables, it subscribes to that and delivers all the values from the inner Observable on the output Observable.

并使用操作符from发出每个可观测到的可观测数组。

代码语言:javascript
运行
复制
const allEvents = this.dataChannels.pipe(
   map((channels) => from(channels.map((c) => fromEvent(c, 'message'))),
   mergeAll();
);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61712553

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档