首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RxJava调度器不会在休眠状态下更改线程

RxJava调度器不会在休眠状态下更改线程
EN

Stack Overflow用户
提问于 2016-11-12 10:47:10
回答 1查看 213关注 0票数 2

我正面临着非常奇怪的RxJava行为,我不能理解。

假设我想并行处理元素。为此,我使用了flatMap:

代码语言:javascript
运行
复制
public static void log(String msg) {
    String threadName = Thread.currentThread().getName();
    System.out.println(String.format("%s - %s", threadName, msg));
}

public static void sleep(int ms) {
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) throws InterruptedException {

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

    Observable.create(s -> {
        while (true) {
            log("start");
            s.onNext(Math.random());
            sleep(10);
        }
    }).subscribeOn(sA)
            .flatMap(r -> Observable.just(r).subscribeOn(sB))
            .doOnNext(r -> log("process"))
            .subscribe((r) -> log("finish"));
}

输出是非常可预测的:

代码语言:javascript
运行
复制
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-2 - process
pool-2-thread-2 - finish
pool-1-thread-1 - start
pool-2-thread-3 - process
pool-2-thread-3 - finish

好吧,好吧,但是如果我在flatMap并行化调度程序停止改变线程后,将n> 10的睡眠添加到映射中。

代码语言:javascript
运行
复制
public static void main(String[] args) throws InterruptedException {

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

    Observable.create(s -> {
        while (true) {
            log("start");
            s.onNext(Math.random());
            sleep(10);
        }
    }).subscribeOn(sA)
            .flatMap(r -> Observable.just(r).subscribeOn(sB))
            .doOnNext(r -> sleep(15))
            .doOnNext(r -> log("process"))
            .subscribe((r) -> log("finish"));
}

什么提供了以下内容:

代码语言:javascript
运行
复制
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-1 - process

为什么?为什么所有元素在flatMap之后都在同一个线程(池-2-线程-1)中处理?

EN

回答 1

Stack Overflow用户

发布于 2016-11-12 16:58:50

FlatMap将所有并行任务序列化为单个线程,您可以窥探这个线程。试试这个吧

代码语言:javascript
运行
复制
public static void main(String[] args) throws InterruptedException {

Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

Observable.create(s -> {
    while (!s.isUnsubscribed()) {
        log("start");
        s.onNext(Math.random());
        sleep(10);
    }
}).subscribeOn(sA)
        .flatMap(r -> 
            Observable.just(r)
            .subscribeOn(sB)
            .doOnNext(r -> sleep(15))
            .doOnNext(r -> log("process"))
        )
        .subscribe((r) -> log("finish"));
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40559085

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档