我订阅了一个可观察的rxjs。我现在有两个不同的必需品要处理的管道内的可观察。
首先,我要映射输出。第二,我想使用tap
触发副作用,但副作用不能在第一次发射时触发。
所以这显然不起作用,因为skip
在管道上是全局工作的:
this.userChangeSubscription = this.userStateService.userState$
.pipe(
map(userState => userState.prop),
skip(1),
tap(() => this.sideEffect())
)
.subscribe();
有没有办法做到这一点,而不订阅可观察到的两次?
编辑:好的,我现在有几个选项似乎都在工作。现在:该选哪一个?
发布于 2020-03-04 02:50:04
我已经看到了有用的答案,但我认为您最好的选择是为这个usecase编写您自己的rxjs操作符。这将导致一个干净的解决方案,并且在您的pipe
函数中澄清您的意图。
要做到这一点,我们需要使用defer
observable
function tapSkipFirst<T>(fn: Function): OperatorFunction<T, T> {
return function(source: Observable<any>) {
return defer(() => {
let skip = true;
return source.pipe(
tap((v?:any) => {
if (!skip) {
fn(v);
}
skip = false;
})
);
});
};
}
我们使用skip
变量来决定是否运行副作用函数。在第一次运行结束时,我们切换skip
为false,因此运行所有后续运行的副作用函数。我们需要在这里使用可观察到的defer
,因为defer
中的代码只在订阅时调用,而不是在创建时调用。这一点很重要,因为否则所有订阅都将共享相同的skip
变量。
现在您可以很容易地使用新的自定义运算符,例如:
this.userChangeSubscription = this.userStateService.userState$
.pipe(
map(userState => userState.prop),
tapSkipFirst(() => this.sideEffect())
)
.subscribe();
发布于 2020-02-25 10:01:44
是的是可能的。示例:
const userProp$ = this.userStateService.userState$.pipe(
map(userState => userState.prop),
shareReplay({ bufferSize: 1, refCount: true })
);
在这里,shareReplay将允许您在多个订阅者之间共享该资源,而不会再次触发整个链。一旦所有订阅者都完成了监听,shareReplay就会结束,上面可观察到的内容就会结束。
然后,您可以订阅多少次,也可以隔离副作用,这是一个好主意,首先:
userProp$
.pipe(
skip(1),
tap(() => this.sideEffect())
)
.subscribe();
发布于 2020-02-26 01:17:01
对于这种用例,您可以使用switchMap
操作符。它给出了一个index
参数,允许您根据发射的第一、第二、第三等执行不同的操作。
在这种情况下,如果sideEffect
=0(第一次排放),那么index
不会被触发,但是对于任何其他的未来排放,index
将被触发。
我使用这个操作符来触发副作用,或者根据条件改变发出的值。这是很棒的,因为您不需要将可观察到的源分割成两个具有不同管道的不同的可观测值。
this.userChangeSubscription = this.userStateService.userState$
.pipe(
map(userState => userState.prop),
switchMap((value, index) => {
return index > 0
? of(value).pipe(tap(() => this.sideEffect()))
: of(value);
})
)
.subscribe();
https://stackoverflow.com/questions/60400125
复制