我正以正常的方式在RxJS中使用RxJS运算符:
var combined = source1.withLatestFrom(source2, source3);...to会主动收集来自source2和source3的最新排放,只有当source1发出时,才会释放出这三个值。
但我不能保证source2或source3会在source1生成值之前产生值。相反,我需要等待,直到所有三个源都产生至少一个值,然后才让withLatestFrom完成它的任务。
合同必须是:如果source1发出,那么combined将始终在其他源最终产生时发出。如果source1在等待其他源时多次发射,我们可以使用最新的值并丢弃前面的值。编辑:作为大理石图表:
--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--
--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--
------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--我可以为这个做一个自定义操作符,但是我想确保我不会错过一个使用香草操作符来实现这个操作的明显方法。感觉就像我希望combineLatest用于初始的发射,然后从那时开始切换到withLatestFrom,但是我还没有弄清楚如何做到这一点。
编辑:来自最终解决方案的完整代码示例:
var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');
var combined = source1.publish(function(s1) {
return source2.publish(function(s2) {
return source3.publish(function(s3) {
var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));
return Rx.Observable.merge(cL, wLF);
});
});
});
var sub1 = combined.subscribe(x => console.log('x', x));
// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');
// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]
// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]发布于 2016-08-23 14:50:58
我认为答案或多或少正如您所描述的,让第一个值是combineLatest,然后切换到withLatestFrom。我的JS很模糊,但我想它看起来会是这样的:
var selector = function(x,y,z) {};
var combined = Rx.Observable.concat(
source1.combineLatest(source2, source3, selector).take(1),
source1.withLatestFrom(source2, source3, selector)
);您可能应该使用publish来避免多个订阅,因此如下所示:
var combined = source1.publish(function(s1)
{
return source2.publish(function(s2)
{
return source3.publish(function(s3)
{
return Rx.Observable.concat(
s1.combineLatest(s2, s3, selector).take(1),
s1.withLatestFrom(s2, s3, selector)
);
});
});
});或者使用箭头函数..。
var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 =>
Rx.Observable.concat(
s1.combineLatest(s2, s3, selector).take(1),
s1.withLatestFrom(s2, s3, selector)
)
)));编辑
我看到了concat的问题,withLatestFrom没有得到这些值。我认为以下几点是可行的:
var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 =>
Rx.Observable.merge(
s1.combineLatest(s2, s3, selector).take(1),
s1.skip(1).withLatestFrom(s2, s3, selector)
)
)));...so使用combineLatest获取一个值,然后使用withLatestFrom获取其余的值。
发布于 2018-03-28 15:34:28
我对接受的答案不太满意,所以我最终找到了另一个解决方案。剥猫皮的方法很多!
我的用例只涉及两个流-一个“请求”流和一个“令牌”流。我希望请求一收到就立即启动,无论最新的令牌是什么。如果还没有令牌,则应该等到第一个令牌出现,然后触发所有挂起的请求。
我对接受的答案不太满意,所以我最终找到了另一个解决方案。本质上,我将请求流分成两部分--在第一个令牌到达之前和之后。我缓冲第一部分,然后一旦我知道令牌流是非空的,就一次性地重新释放所有内容。
const first = token$.first()
Rx.Observable.merge(
request$.buffer(first).mergeAll(),
request$.skipUntil(first)
)
.withLatestFrom(token$)看看这里的直播:https://rxviz.com/v/VOK2GEoX
RxJs 7的:
const first = token$.first()
merge(
request$.pipe(
buffer(first),
mergeAll()
),
request$.pipe(
skipUntil(first)
)
).pipe(
withLatestFrom(token$)
)发布于 2017-09-21 16:41:03
我也有类似的要求,但只有两个可观察的。我最后使用了switchMap+first:
observable1
.switchMap(() => observable2.first(), (a, b) => [a, b])
.subscribe(([a, b]) => {...}));所以它:
在我的例子中,第二个可观察到的是ReplaySubject。我不确定它是否适用于其他可以观察到的类型。
我认为:
我感到惊讶的是,withLatestFrom不会等待第二个可观察到的。
https://stackoverflow.com/questions/39097699
复制相似问题