前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Reactor 之 subcribeOn vs publishOn

Reactor 之 subcribeOn vs publishOn

作者头像
伍六七AI编程
发布2022-09-23 15:38:52
5590
发布2022-09-23 15:38:52
举报
文章被收录于专栏:prepared

我们使用 subscribeOn 和 publishOn 操作符在响应链中切换执行上下文(Reactor 中叫 Scheduler)。

上一篇文章中,我们说到 Reactor 默认行为是执行订阅的同一线程将用于整个管道执行。如果要切换执行线程怎么办?可以使用 publishOn 和 SubscribeOn

让我们看个简单的例子:

代码语言:javascript
复制
class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
            .map(String::toUpperCase)
            .filter(cityName -> cityName.length() <= 8)
            .map(cityName -> cityName.concat(" City"))
            .log();

    cities.subscribe();

  }

输出:

代码语言:javascript
复制
17:39:41.693 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:39:41.712 [main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
17:39:41.714 [main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(NEW YORK City)
17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(LONDON City)
17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(PARIS City)
17:39:41.716 [main] INFO reactor.Flux.MapFuseable.1 - | onComplete()

中括号中的就是线程名称,在这个例子中,都是 main。可以看到整个管道执行器中都是使用的 main 线程。

有的时候,我们可能想告诉 Reactor 别在整个管道中使用同一个线程。我可以使用 subscribeOn() 和 publishOn() 方法达到效果。

subscribeOn() 方法

subscribeOn() 方法适用于订阅过程。我们可以把它放在响应链条中的任意位置。它接收 Scheduler 参数,且在提供的线程池中选择线程执行。

在下面的例子中,我们使用有界弹性线程池(Schedulers.boundElastic())

代码语言:javascript
复制
@Test
public void testSubscribeThread() {
  Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
      .subscribeOn(Schedulers.boundedElastic())
      .map(String::toUpperCase)
      .filter(cityName -> cityName.length() <= 8)
      .map(cityName -> cityName.concat(" City"))
      .map(TestCase::concat)
      .map(TestCase::stringToUpperCase)
      .log();

//        cities.subscribe();
  System.out.println(cities.blockFirst());
}

PS: 原文提供的case,没有输出,简单修改了一下。

输出:

代码语言:javascript
复制
20:07:53.517 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:07:53.558 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
20:07:53.560 [main] INFO reactor.Flux.Map.1 - request(unbounded)
concat: boundedElastic-1
stringToUpperCase: boundedElastic-1
20:07:53.564 [boundedElastic-1] INFO reactor.Flux.Map.1 - onNext(NEW YORK CITY CITY)
20:07:53.565 [boundedElastic-1] INFO reactor.Flux.Map.1 - cancel()
NEW YORK CITY CITY

可以看到 main 线程开始订阅,但是被切换成 boundedElastic-1 线程。我们提供了一个 Scheduler (Schedulers.boundedElastic()),然后这个线程池中的一个线程被选中来替换 main 线程。

publishOn() 方法

publishOn() 方法跟 subscribeOn() 很类似,但是有一个主要区别。

来看个例子:

代码语言:javascript
复制
class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just("New York", "London", "Paris", "Amsterdam")
            .map(ReactiveJavaTutorial::stringToUpperCase)
            .publishOn(Schedulers.boundedElastic())
            .map(ReactiveJavaTutorial::concat)
            .subscribe();
  }

  private static String stringToUpperCase(String name) {
    System.out.println("stringToUpperCase: " + Thread.currentThread().getName());
    return name.toUpperCase();
  }

  private static String concat(String name) {
    System.out.println("concat: " + Thread.currentThread().getName());
    return name.concat(" City");
  }
}

这里,我们在两个 map 操作中放一个 publishOn()。我们来看输出:

代码语言:javascript
复制
stringToUpperCase: main
stringToUpperCase: main
stringToUpperCase: main
concat: boundedElastic-1
concat: boundedElastic-1
concat: boundedElastic-1

可以看到,所有在 publishOn 操作之前的都是 main 线程执行,所有 publishOn 之后的都是 boundedElastic-1 执行。这是因为 publishOn 充当任何其他操作符。它从上游接收信号,并在关联的 Scheduler 上对一个 worker 执行回调时向下游重播。

这就是 publishOnsubscribeOn() 的主要区别。无论我们把 subscribeOn() 放在哪里,它提供的 Scheduler 都会应用到整条响应链。

subscribeOn and publishOn operators in Project Reactor

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • subscribeOn() 方法
  • publishOn() 方法
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档