首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >具有多个有限活动流的RxJava - SwitchMap相似

具有多个有限活动流的RxJava - SwitchMap相似
EN

Stack Overflow用户
提问于 2017-01-30 07:57:52
回答 2查看 468关注 0票数 2

我想知道如何将一个可观察到的类似于switchMap的流转换成多个(有限)流,而不是局限于单个活动流。

其目的是使多个任务同时工作,达到某些任务计数的限制,并允许使用FIFO队列策略启动新任务,这意味着任何新任务都将立即启动,队列中最老的任务将被取消。

switchMap将为源的每个发射创建可观测的,并将取消以前运行的可观测流(一旦创建了新的),我希望实现类似的目标,但允许具有某种级别的并发性(如flatMap),这意味着允许为每个发射创建可观测的数量,并同时运行到某个并发极限,当达到并发极限时,最古老的可观测值将被取消,新的可观测流将启动。

实际上,这与flatMap与maxConcurrent类似,但是当到达maxConcurrent时,不再等待新的可观测值,而是取消旧的可观测值并立即输入新的。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-01-31 15:37:43

你可以试试这个变压器:

代码语言:javascript
复制
public static <T, R> Observable.Transformer<T, R> switchFlatMap(
        int n, Func1<T, Observable<R>> mapper) {
    return f -> 
        Observable.defer(() -> {
            final AtomicInteger ingress = new AtomicInteger();
            final Subject<Integer, Integer> cancel = 
                    PublishSubject.<Integer>create().toSerialized();

            return f.flatMap(v -> {
                int id = ingress.getAndIncrement();
                Observable<R> o = mapper.call(v)
                        .takeUntil(cancel.filter(e -> e == id + n));
                cancel.onNext(id);
                return o;
            });
        })
    ;
}

示威活动:

代码语言:javascript
复制
public static void main(String[] args) {
    PublishSubject<Integer> ps = PublishSubject.create();

    @SuppressWarnings("unchecked")
    PublishSubject<Integer>[] pss = new PublishSubject[3];
    for (int i = 0; i < pss.length; i++) {
        pss[i] = PublishSubject.create();
    }

    AssertableSubscriber<Integer> ts = ps
    .compose(switchFlatMap(2, v -> pss[v]))
    .test();

    ps.onNext(0);
    ps.onNext(1);

    pss[0].onNext(1);
    pss[0].onNext(2);
    pss[0].onNext(3);

    pss[1].onNext(10);
    pss[1].onNext(11);
    pss[1].onNext(12);

    ps.onNext(2);

    pss[0].onNext(4);

    pss[2].onNext(20);
    pss[2].onNext(21);
    pss[2].onNext(22);

    pss[1].onCompleted();
    pss[2].onCompleted();
    ps.onCompleted();

    ts.assertResult(1, 2, 3, 10, 11, 12, 20, 21, 22);
}
票数 3
EN

Stack Overflow用户

发布于 2017-01-31 07:43:06

虽然已准备好的解决方案不可用,但下面这样的内容应该会有所帮助。

代码语言:javascript
复制
public static void main(String[] args) {

    Observable.create(subscriber -> {
                for (int i = 0; i < 5; i++) {
                    Observable.timer(i, TimeUnit.SECONDS).toBlocking().subscribe();
                    subscriber.onNext(i);
                }
            })
            .switchMap(
                    n -> {
                        System.out.println("Main task emitted event - " + n);
                        return Observable.interval(1, TimeUnit.SECONDS).take((int) n * 3)
                                .doOnUnsubscribe(() -> System.out.println("Unsubscribed for main task event - "+ n));
                    }).subscribe(n2 -> System.out.println("\t" + n2));

    Observable.timer(20, TimeUnit.SECONDS).toBlocking().subscribe();
}

Observable.create部分创建了一个缓慢的生成器,它以发射0、睡眠1s和发出1、睡眠2s和发出2等方式发布项目。

switchMap为每秒钟发出数字的每个元素创建Observable对象。您还可以注意到,每当主Observable发出一个元素时,以及当它取消订阅时,它都会打印一行。

因此,在您的示例中,您可能对使用doOnUnsubscribe关闭最老的任务感兴趣。希望能帮上忙。

下面的伪代码可能有助于更好地理解。

代码语言:javascript
复制
getTaskObservable()
        .switchMap(
                task -> {
                    System.out.println("Main task emitted event - " + task);
                    return Observable.create(subscriber -> {
                        initiateTaskAndNotify(task, subscriber);
                    }).doOnUnsubscribe(() -> checkAndKillIfMaxConcurrentTasksReached(task));
                }).subscribe(value -> System.out.println("Done with task and got output" + value));
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41931175

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档