首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Schedulers.boundedElastic似乎使用相同的线程进行处理

Schedulers.boundedElastic似乎使用相同的线程进行处理
EN

Stack Overflow用户
提问于 2021-03-17 12:39:44
回答 2查看 3.1K关注 0票数 2

通过查看API,我的理解是,使用Schedulers.boundedElastic()或Schedulers.newBoundedElastic(3、10、“MyThreadGroup”)或Schedulers.fromExecutor(executor)等变体可以在多个线程中处理IO操作。

但是使用下面的示例代码进行的模拟似乎表明单个线程/同一个线程正在flatMap中执行这项工作。

代码语言:javascript
运行
复制
Flux.range(0, 100)
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();

Thread.sleep(10000); // main thread

//This yields the following

Mapping for 0 is done by thread boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Mapping for 2 is done by thread boundedElastic-1
Mapping for 3 is done by thread boundedElastic-1 ...

上面的输出告诉我,在flatMap中运行的线程是相同的。在对多个IO的子代码调用flatMap时,是否有一种方法可以获得多个线程来处理项?我本来想看到有赏金的弹性-1,有赏金的弹性-2.

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-03-19 04:31:54

让flatMap在多个线程上运行的一种方法是创建一个ParallelFlux。下面的示例代码完成了这个任务。

代码语言:javascript
运行
复制
Flux.range(0, 1000)
                .parallel()             
                .runOn(Schedulers.boundedElastic())
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("second Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribe();
        
        Thread.sleep(10000);
票数 0
EN

Stack Overflow用户

发布于 2021-03-17 21:40:57

1.非阻塞IO (首选)的并发性

如果您有机会使用非阻塞IO (如Spring WebClient),那么您就不需要担心线程或调度器了,就可以获得并发性:

代码语言:javascript
运行
复制
Flux.range(0, 100)
        .flatMap(i -> Mono.delay(Duration.ofMillis(500)) // e.g.: reactive webclient call
                .doOnNext(x -> System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread()
                        .getName())))
        .subscribe();

2.阻塞IO的并发性

如果您有选择的话,最好避免阻塞IO。如果您无法避免这种情况,只需对代码稍加修改,并将subscribeOn应用于内部Mono

代码语言:javascript
运行
复制
Flux.range(0, 100)
        .flatMap(i -> Mono.fromRunnable(() -> {
            try {
                // IO operation
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
        }).subscribeOn(Schedulers.boundedElastic()))
        .subscribe();
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66673282

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档