前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊reactor异步线程的变量传递

聊聊reactor异步线程的变量传递

作者头像
code4it
发布2018-09-17 16:00:51
3K0
发布2018-09-17 16:00:51
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究下reactor异步线程的变量传递

threadlocal的问题

在传统的请求/应答同步模式中,使用threadlocal来传递上下文变量是非常方便的,可以省得在每个方法参数添加公用的变量,比如当前登录用户。但是业务方法可能使用了async或者在其他线程池中异步执行,这个时候threadlocal的作用就失效了。

这个时候的解决办法就是采取propagation模式,即在同步线程与异步线程衔接处传播这个变量。

TaskDecorator

比如spring就提供了TaskDecorator,通过实现这个接口,可以自己控制传播那些变量。例如:

代码语言:javascript
复制
class MdcTaskDecorator implements TaskDecorator {

  @Override
  public Runnable decorate(Runnable runnable) {
    // Right now: Web thread context !
    // (Grab the current thread MDC data)
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
      try {
        // Right now: @Async thread context !
        // (Restore the Web thread context's MDC data)
        MDC.setContextMap(contextMap);
        runnable.run();
      } finally {
        MDC.clear();
      }
    };
  }
}

这里注意在finally里头clear

配置这个taskDecorator

代码语言:javascript
复制
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {

  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setTaskDecorator(new MdcTaskDecorator());
    executor.initialize();
    return executor;
  }

}

完整实例详见Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads

reactor Context

spring5引入webflux,其底层是基于reactor,那么reactor如何进行上下文变量的传播呢?官方提供了Context对象来替代threadlocal。

其特性如下:

  • 类似map的kv操作,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
  • immutable,即同一个key,后面put不会覆盖
  • 提供getOrDefault,getOrEmpty方法
  • Context与作用链上的每个Subscriber绑定
  • 通过subscriberContext(Context)来访问
  • Context的作用是自底向上

实例

设置及读取

代码语言:javascript
复制
    @Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }

这里从最底部的subscriberContext设置message值为World,然后flatMap里头通过subscriberContext来访问。

自底向上

代码语言:javascript
复制
    @Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                //NOTE 这个subscriberContext设置的太高了
                .subscriberContext(ctx -> ctx.put(key, "World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }

由于这个例子的subscriberContext设置的太高了,不能作用在flatMap里头的Mono.subscriberContext()

不可变

代码语言:javascript
复制
    @Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"))
                //这里返回了一个新的,因此上面的设置失效了
                .flatMap( ctx -> Mono.subscriberContext())
                .map( ctx -> ctx.getOrDefault(key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }

subscriberContext永远返回一个新的

多个连续的subscriberContext

代码语言:javascript
复制
    @Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }

operator只会读取离它最近的一个context

flatMap间的subscriberContext

代码语言:javascript
复制
    @Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }

flatMap读取离它最近的context

flatMap中的subscriberContext

代码语言:javascript
复制
    @Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }

这里第一个flatMap无法读取第二个flatMap内部的context

小结

reactor通过提供Context来实现了类似同步线程threadlocal的功能,非常强大,值得好好琢磨。

doc

  • TaskDecorator
  • Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
  • HOW TO PASS CONTEXT IN STANDARD WAY - WITHOUT THREADLOCAL
  • Spring Security Context Propagation with @Async
  • 如何在async线程中访问RequestContextHolder
  • Context Aware Java Executor and Spring’s @Async
  • 8.8.1. The Context API
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • threadlocal的问题
  • TaskDecorator
  • reactor Context
  • 实例
    • 设置及读取
      • 自底向上
        • 不可变
          • 多个连续的subscriberContext
            • flatMap间的subscriberContext
              • flatMap中的subscriberContext
              • 小结
              • doc
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档