首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >让withLatestFrom等待直到所有源都产生了一个值

让withLatestFrom等待直到所有源都产生了一个值
EN

Stack Overflow用户
提问于 2016-08-23 09:43:16
回答 8查看 15.1K关注 0票数 26

我正以正常的方式在RxJS中使用RxJS运算符:

代码语言:javascript
运行
复制
var combined = source1.withLatestFrom(source2, source3);

...to会主动收集来自source2source3的最新排放,只有当source1发出时,才会释放出这三个值。

但我不能保证source2source3会在source1生成值之前产生值。相反,我需要等待,直到所有三个源都产生至少一个值,然后才让withLatestFrom完成它的任务。

合同必须是:如果source1发出,那么combined将始终在其他源最终产生时发出。如果source1在等待其他源时多次发射,我们可以使用最新的值并丢弃前面的值。编辑:作为大理石图表:

代码语言:javascript
运行
复制
--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--


--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--


------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--

我可以为这个做一个自定义操作符,但是我想确保我不会错过一个使用香草操作符来实现这个操作的明显方法。感觉就像我希望combineLatest用于初始的发射,然后从那时开始切换到withLatestFrom,但是我还没有弄清楚如何做到这一点。

编辑:来自最终解决方案的完整代码示例:

代码语言:javascript
运行
复制
var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');

var combined = source1.publish(function(s1) {
    return source2.publish(function(s2) {
        return source3.publish(function(s3) {
            var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
            var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));

            return Rx.Observable.merge(cL, wLF);
        });
    });
});

var sub1 = combined.subscribe(x => console.log('x', x));

// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');

// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]

// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]
EN

Stack Overflow用户

发布于 2018-03-28 15:34:28

我对接受的答案不太满意,所以我最终找到了另一个解决方案。剥猫皮的方法很多!

我的用例只涉及两个流-一个“请求”流和一个“令牌”流。我希望请求一收到就立即启动,无论最新的令牌是什么。如果还没有令牌,则应该等到第一个令牌出现,然后触发所有挂起的请求。

我对接受的答案不太满意,所以我最终找到了另一个解决方案。本质上,我将请求流分成两部分--在第一个令牌到达之前和之后。我缓冲第一部分,然后一旦我知道令牌流是非空的,就一次性地重新释放所有内容。

代码语言:javascript
运行
复制
const first = token$.first()

Rx.Observable.merge(
  request$.buffer(first).mergeAll(),
  request$.skipUntil(first)
)
  .withLatestFrom(token$)

看看这里的直播:https://rxviz.com/v/VOK2GEoX

RxJs 7的

代码语言:javascript
运行
复制
const first = token$.first()

merge(
  request$.pipe(
    buffer(first),
    mergeAll()
  ),
  request$.pipe(
    skipUntil(first)
  )
).pipe(
  withLatestFrom(token$)
)
票数 6
EN
查看全部 8 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39097699

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档