首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >让withLatestFrom等待直到所有源都产生了一个值

让withLatestFrom等待直到所有源都产生了一个值
EN

Stack Overflow用户
提问于 2016-08-23 09:43:16
回答 8查看 15.1K关注 0票数 26

我正以正常的方式在RxJS中使用RxJS运算符:

代码语言:javascript
运行
复制
var combined = source1.withLatestFrom(source2, source3);

...to会主动收集来自source2source3的最新排放,只有当source1发出时,才会释放出这三个值。

但我不能保证source2source3会在source1生成值之前产生值。相反,我需要等待,直到所有三个源都产生至少一个值,然后才让withLatestFrom完成它的任务。

合同必须是:如果source1发出,那么combined将始终在其他源最终产生时发出。如果source1在等待其他源时多次发射,我们可以使用最新的值并丢弃前面的值。编辑:作为大理石图表:

代码语言:javascript
运行
复制
--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--


--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--


------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--

我可以为这个做一个自定义操作符,但是我想确保我不会错过一个使用香草操作符来实现这个操作的明显方法。感觉就像我希望combineLatest用于初始的发射,然后从那时开始切换到withLatestFrom,但是我还没有弄清楚如何做到这一点。

编辑:来自最终解决方案的完整代码示例:

代码语言:javascript
运行
复制
var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');

var combined = source1.publish(function(s1) {
    return source2.publish(function(s2) {
        return source3.publish(function(s3) {
            var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
            var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));

            return Rx.Observable.merge(cL, wLF);
        });
    });
});

var sub1 = combined.subscribe(x => console.log('x', x));

// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');

// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]

// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]
EN

回答 8

Stack Overflow用户

回答已采纳

发布于 2016-08-23 14:50:58

我认为答案或多或少正如您所描述的,让第一个值是combineLatest,然后切换到withLatestFrom。我的JS很模糊,但我想它看起来会是这样的:

代码语言:javascript
运行
复制
var selector = function(x,y,z) {};

var combined = Rx.Observable.concat(
    source1.combineLatest(source2, source3, selector).take(1),
    source1.withLatestFrom(source2, source3, selector)
);

您可能应该使用publish来避免多个订阅,因此如下所示:

代码语言:javascript
运行
复制
var combined = source1.publish(function(s1)
{
    return source2.publish(function(s2)
    {
        return source3.publish(function(s3)
        {
            return Rx.Observable.concat(
                s1.combineLatest(s2, s3, selector).take(1),
                s1.withLatestFrom(s2, s3, selector)
            );
        });
    });
});

或者使用箭头函数..。

代码语言:javascript
运行
复制
var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 => 
    Rx.Observable.concat(
        s1.combineLatest(s2, s3, selector).take(1),
        s1.withLatestFrom(s2, s3, selector)
    )
)));

编辑

我看到了concat的问题,withLatestFrom没有得到这些值。我认为以下几点是可行的:

代码语言:javascript
运行
复制
var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 => 
    Rx.Observable.merge(
        s1.combineLatest(s2, s3, selector).take(1),
        s1.skip(1).withLatestFrom(s2, s3, selector)
    )
)));

...so使用combineLatest获取一个值,然后使用withLatestFrom获取其余的值。

票数 9
EN

Stack Overflow用户

发布于 2018-03-28 15:34:28

我对接受的答案不太满意,所以我最终找到了另一个解决方案。剥猫皮的方法很多!

我的用例只涉及两个流-一个“请求”流和一个“令牌”流。我希望请求一收到就立即启动,无论最新的令牌是什么。如果还没有令牌,则应该等到第一个令牌出现,然后触发所有挂起的请求。

我对接受的答案不太满意,所以我最终找到了另一个解决方案。本质上,我将请求流分成两部分--在第一个令牌到达之前和之后。我缓冲第一部分,然后一旦我知道令牌流是非空的,就一次性地重新释放所有内容。

代码语言:javascript
运行
复制
const first = token$.first()

Rx.Observable.merge(
  request$.buffer(first).mergeAll(),
  request$.skipUntil(first)
)
  .withLatestFrom(token$)

看看这里的直播:https://rxviz.com/v/VOK2GEoX

RxJs 7的

代码语言:javascript
运行
复制
const first = token$.first()

merge(
  request$.pipe(
    buffer(first),
    mergeAll()
  ),
  request$.pipe(
    skipUntil(first)
  )
).pipe(
  withLatestFrom(token$)
)
票数 6
EN

Stack Overflow用户

发布于 2017-09-21 16:41:03

我也有类似的要求,但只有两个可观察的。我最后使用了switchMap+first:

代码语言:javascript
运行
复制
observable1
 .switchMap(() => observable2.first(), (a, b) => [a, b])
 .subscribe(([a, b]) => {...}));

所以它:

  • 等待,直到两个可观测值发出某种值。
  • 只有当第一个可观测值发生变化时才能从第二个可观察到的值中提取值(与combineLatest不同)
  • 没有在第二个可观测到的位置挂载(因为.first())

在我的例子中,第二个可观察到的是ReplaySubject。我不确定它是否适用于其他可以观察到的类型。

我认为:

  • flatMap可能也会起作用
  • 也许可以将此方法扩展到处理两个以上的可观测数据。

我感到惊讶的是,withLatestFrom不会等待第二个可观察到的。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39097699

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档