我想要创建一个函数,它将从一个fromEvent
数组中创建的所有datachannels
合并起来,并发出从其中任何一个接收到的任何事件。
这是最初的功能:
private dataChannels: BehaviorSubject<RTCDataChannel[]> = new BehaviorSubject([]);
// datachannels array get filled at some point
...
public on(): Observable<Event> {
const eventStreams = this.dataChannels.value.map((c) => fromEvent(c, 'message'));
return merge(...eventStreams);
}
然而,上面的函数的问题是,据我所知,将使用而不是的新值发送给datachannels。
因此,我开始创建以下函数:
const allEvents = this.dataChannels.pipe(map((channels) => channels.map((c) => fromEvent(c, 'message'))));
return merge(...allEvents);
这就是我的问题所在。allEvents
是Observable<Observable<Event>[]>
型的。merge
不会接受的。如何解析可观察到的输入Observable<Event>[]
发布于 2020-05-10 13:49:36
您可以尝试合并在BehaviorSubject中接收到的所有可观察到的数据。我看到的问题是,即使频道已从dataChannels中删除,它们仍将被挂起订阅。我已经创建了一个类支持newSubscription$
,当发出这个选项时,所有的fromEvent
都会取消订阅。
你可以试试这个解决方案。
newSubscription$ = new Subject();
this.dataChannels.pipe(
tap(() => {
this.newSubscription$.next(); // kill existing subscriptions
this.newSubscription$ = new Subject(); // and wait for this for new ones
})
switchMap((allChannels) => {
const fromEvenObs = allChannels.map(c => fromEvent(c, 'message').pipe(takeUntil(newSubscription$)));
return merge(...fromEvenObs);
})).subscribe((data) => { // data from any event});
编辑:
由于我们使用的是switchMap()
,所以在events
接收到新数据时,不需要有额外的可观察性来取消订阅events
。仅做以下工作就足够了:
this.dataChannels.pipe(
switchMap((allChannels) => {
const fromEvenObs = allChannels.map(c => fromEvent(c, 'message'));
return merge(...fromEvenObs);
})).subscribe((data) => { //data from events });
发布于 2020-05-10 13:45:43
您可以使用操作符mergeAll
,正如文档中所说的mergeAll subscribes to an Observable that emits Observables. Each time it observes one of these emitted inner Observables, it subscribes to that and delivers all the values from the inner Observable on the output Observable.
并使用操作符from
发出每个可观测到的可观测数组。
const allEvents = this.dataChannels.pipe(
map((channels) => from(channels.map((c) => fromEvent(c, 'message'))),
mergeAll();
);
https://stackoverflow.com/questions/61712553
复制相似问题