这种行为在我的特定用例中并不糟糕,但它是非常出乎意料的,我想知道为什么会发生这种情况。
我在沙箱(导入的RxJS 6.6.0)中运行了以下代码:
const Rx = rxjs;
const {map,tap, merge, take} = Rx.operators;
const sub = new Rx.ReplaySubject();
sub.next(1);
sub.pipe(
tap(x => {
if (x < 50) sub.next(x*2);
})
)
.subscribe({
next(x) {
console.info('Subscriber A', x)
},
complete() {
console.log('Subscriber A -- Complete!')
}
});.as-console-wrapper { max-height: 100% !important; top: 0; }<script src="https://unpkg.com/rxjs@6.6.0/bundles/rxjs.umd.min.js"></script>
我可能以完全错误的方式来做这件事,但我希望中间函数将项目推回到原始的主题/可观察对象中,同时也发出原始的项目(一种反馈循环)。这是我得到的输出:
Subscriber A64
Subscriber A32
Subscriber A16
Subscriber A8
Subscriber A4
Subscriber A2
Subscriber A1我本希望先有"Subscriber A1“,但看起来next是在流的开头而不是结尾添加项目?有人能帮我弄明白这一点吗?
发布于 2021-07-21 16:18:29
发出的值在通过所有管道之前不会到达订阅者,由于您是在分路器内部调用next,因此在前一个分路器完成之前发出的这个新值将被处理,从而首先到达订阅者。对next的每个调用重复此操作。
一些可视化效果:
const Rx = rxjs;
const { map, tap, merge, take } = Rx.operators;
const sub = new Rx.ReplaySubject();
sub.next(1);
sub.pipe(
tap(x => {
console.info("before next in tap x = ", x);
if (x < 50) sub.next(x * 2);
console.info("after next in tap x = ", x);
})
)
.subscribe({
next(x) {
console.info('Subscriber A', x)
},
complete() {
console.log('Subscriber A -- Complete!')
}
});.as-console-wrapper { max-height: 100% !important; top: 0; }<script src="https://unpkg.com/rxjs@6.6.0/bundles/rxjs.umd.min.js"></script>
发布于 2021-07-21 16:20:55
我的猜测是,由于在tap()运算符中使用了sub.next(),所以会产生递归效果。因此,它填充调用堆栈,直到达到条件限制,然后从最后到第一个返回堆栈。
但是,如果您需要在可观察对象中具有累积效果,则scan()是更理想的选择。它的行为类似于Array.reduce(),但它为每个发出的值运行内部函数。然后takeUntil()处理条件限制。
const Rx = rxjs;
const {interval} = Rx;
const { scan, tap, takeWhile, startWith} = Rx.operators;
const double$ = interval(1).pipe(
scan((acc, cur)=>acc*2,1),
startWith(1),
takeWhile(value=>value<50, true),
tap(value=>console.info(`Subscriber A${value}`)),
).subscribe();<script src="https://unpkg.com/rxjs@6.6.0/bundles/rxjs.umd.min.js"></script>
https://stackoverflow.com/questions/68463529
复制相似问题