我正以正常的方式在RxJS中使用RxJS运算符:
var combined = source1.withLatestFrom(source2, source3);...to会主动收集来自source2和source3的最新排放,只有当source1发出时,才会释放出这三个值。
但我不能保证source2或source3会在source1生成值之前产生值。相反,我需要等待,直到所有三个源都产生至少一个值,然后才让withLatestFrom完成它的任务。
合同必须是:如果source1发出,那么combined将始终在其他源最终产生时发出。如果source1在等待其他源时多次发射,我们可以使用最新的值并丢弃前面的值。编辑:作为大理石图表:
--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--
--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--
------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--我可以为这个做一个自定义操作符,但是我想确保我不会错过一个使用香草操作符来实现这个操作的明显方法。感觉就像我希望combineLatest用于初始的发射,然后从那时开始切换到withLatestFrom,但是我还没有弄清楚如何做到这一点。
编辑:来自最终解决方案的完整代码示例:
var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');
var combined = source1.publish(function(s1) {
    return source2.publish(function(s2) {
        return source3.publish(function(s3) {
            var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
            var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));
            return Rx.Observable.merge(cL, wLF);
        });
    });
});
var sub1 = combined.subscribe(x => console.log('x', x));
// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');
// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]
// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]发布于 2018-03-28 15:34:28
我对接受的答案不太满意,所以我最终找到了另一个解决方案。剥猫皮的方法很多!
我的用例只涉及两个流-一个“请求”流和一个“令牌”流。我希望请求一收到就立即启动,无论最新的令牌是什么。如果还没有令牌,则应该等到第一个令牌出现,然后触发所有挂起的请求。
我对接受的答案不太满意,所以我最终找到了另一个解决方案。本质上,我将请求流分成两部分--在第一个令牌到达之前和之后。我缓冲第一部分,然后一旦我知道令牌流是非空的,就一次性地重新释放所有内容。
const first = token$.first()
Rx.Observable.merge(
  request$.buffer(first).mergeAll(),
  request$.skipUntil(first)
)
  .withLatestFrom(token$)看看这里的直播:https://rxviz.com/v/VOK2GEoX
RxJs 7的:
const first = token$.first()
merge(
  request$.pipe(
    buffer(first),
    mergeAll()
  ),
  request$.pipe(
    skipUntil(first)
  )
).pipe(
  withLatestFrom(token$)
)https://stackoverflow.com/questions/39097699
复制相似问题