前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 二十、Hystrix跨线程传递数据解决方案:HystrixRequestContext

[享学Netflix] 二十、Hystrix跨线程传递数据解决方案:HystrixRequestContext

作者头像
YourBatman
发布2020-03-18 19:21:09
4.8K0
发布2020-03-18 19:21:09
举报
文章被收录于专栏:BAT的乌托邦

不要老想着做不顺就放弃,哪个团队都有问题,哪个团队都有优点 代码下载地址:https://github.com/f641385712/netflix-learning

目录
  • 前言
  • 正文
    • HystrixRequestContext
      • 使用示例一:
        • 提出疑问
      • HystrixRequestVariable
        • HystrixRequestVariableDefault
    • HystrixContextRunnable
      • 使用示例二:
    • HystrixConcurrencyStrategy通用方案解决跨线程传值
  • 总结
    • 声明

前言

说到线程间的数据传递,你肯定会联想到ThreadLocal。但若是跨线程传递数据呢?阅读本文之前,我个人建议你已经能够非常熟练的使用ThreadLocal并了解其基本原理,至少看过下面两篇文章表述的内容:

Hystrix里,它支持两种隔离模式:线程池隔离和信号量隔离。前者是默认选项:每个命令都在线程池里隔离执行,因此必然会涉及到存在跨线程传递数据的问题,这是Hystrix需要解决的(信号量隔离不存在此问题~)。


正文

关于线程内、线程间传递数据的进阶三部曲:ThreadLocal -> InheritableThreadLocal -> TransmittableThreadLocal,提供的能力越来越强。强两者由JDK源生提供,最多能支持到父线程向子线程传递数据,但无法解决线程池执行问题。

TransmittableThreadLocal是阿里巴巴开源的TTL工具,专用于解决线程间数据传递问题,支持线程池。但是很显然,Hystrix不可能依赖于阿里巴巴的实现(阿里的年纪比它还小呢),所以它拥有自己的实现方式,核心API为:HystrixRequestContext/HystrixRequestVariableDefault/HystrixContextRunnable


HystrixRequestContext

它是请求级别的上下文,也就是说同一个请求内,前面放进去的内容,只要在请求生命周期内,任何地方均可以获取到。

说明:这里所指的请求级别,你可以理解为一个Http请求一样,只是针对一个请求:请求内的任务可用多个线程或者使用线程池去处理,所以需要解决线程池内的数据获取问题

代码语言:javascript
复制
public class HystrixRequestContext implements Closeable {

	// 利用ThreadLocal, 每个线程各有一份HystrixRequestContext
	// 当然,前提是调用了initializeContext()进行初始化
	private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<>();
    public static HystrixRequestContext initializeContext() {
        HystrixRequestContext state = new HystrixRequestContext();
        requestVariables.set(state); // 每个线程都持有一份当前上下文state
        return state;
    }
    // 当然喽,它也允许你自己重新设置绑定
    // 为当前线程设置一个已存在的HystrixRequestContext
    public static void setContextOnCurrentThread(HystrixRequestContext state) {
        requestVariables.set(state);
    }


	// 得到当前线程的Hystrix请求上下文  从ThreadLocal里拿
    public static HystrixRequestContext getContextForCurrentThread() {
        HystrixRequestContext context = requestVariables.get();
        if (context != null && context.state != null) {
            return context;
        } else {
            return null;
        }
    }
	// 判断当前线程的上下文是否已经初始化~~~~ 就是判断get() == null而已嘛
    public static boolean isCurrentThreadInitialized() {
        HystrixRequestContext context = requestVariables.get();
        return context != null && context.state != null;
    }
}

这段代码看似做的不多:给当前线程绑定一个HystrixRequestContext上下文对象,当然喽,这个上下文对象可以init初始化,也可来自于一个已经存在的上下文

如何理解已经存在的上下文,比如父线程已经存在一个上下文,子线程想绑定时,是可以使用父上下文的,这样父子不就共用一个上下文了麽?这样数据就打通了嘛~


使用示例一:
代码语言:javascript
复制
@Test
public void fun1() {
    HystrixRequestContext.initializeContext();

	... // 处理你的业务逻辑

    HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();

    System.out.println(contextForCurrentThread.getClass());
    // contextForCurrentThread.close();
    contextForCurrentThread.shutdown();
}

运行程序控制台打印:

代码语言:javascript
复制
class com.netflix.hystrix.strategy.concurrency.HystrixRequestContext

是的,单独使用HystrixRequestContext的话,你就只能这么简单用啦。若配上Servlet的Filter,会更有意义些:

代码语言:javascript
复制
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
    HystrixRequestContext context = HystrixRequestContext.initializeContext();
    try {
         chain.doFilter(request, response);
    } finally {
         context.shutdown();
    }
}

官方特别强调:如果你调用了initializeContext()方法,请务必确保shutdown()方法也会被调用,否则会造成可能的内存泄漏。

这样子在Http请求的任意地方,便可轻松的拿到当前线程的HystrixRequestContext对象,从而获取相应数据。

但是,请继续看如下示例:

代码语言:javascript
复制
@Test
public void fun2() throws InterruptedException {
    HystrixRequestContext.initializeContext();


    // 启动子线程完成具体业务逻辑
    new Thread(() -> {
        // 子线程里需要拿到请求上下文处理逻辑
        HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();
        // ... // 处理业务逻辑
        System.out.println("当前Hystrix请求上下文是:" + contextForCurrentThread);
    }).start();

    HystrixRequestContext.getContextForCurrentThread().close();
    TimeUnit.SECONDS.sleep(1);
}

控制台打印:

代码语言:javascript
复制
当前Hystrix请求上下文是:null

我相信你在看了文首推荐的两篇文章后,对这个结果并不会感到惊讶。有的人会说使用InheritableThreadLocal能解决向子线程传递数据的问题,那么问题是如果异步任务交给线程池执行呢?难道你还得借助阿里巴巴的TTL来实现吗?


提出疑问

我们知道Hystrix默认的隔离方式是线程池:每个command/方法均会在线程池内执行,所以传递数据使用InheritableThreadLocal是不能从根本上解决问题的,当然若你说借助阿里巴巴的TTL是可以解决问题,但让堂堂Hystrix依赖阿里巴巴的库,是不是也太不实际了呢?况且Hystrix的年龄可比TTL大多了~

因此,对于以上示例至少可以提出以下疑问:

  1. 光秃秃的在线程内传递一个HystrixRequestContext貌似没有任何意义,上下文里面可以装数据吗?
  2. 对于子线程、线程池内获取上下文HystrixRequestContext,Hystrix是如何解决的???

HystrixRequestContext的Javadoc里有说,它还负责一件事:管理着HystrixRequestVariable的生命周期(存储和销毁),而一个HystrixRequestVariable接口便代表着请求变量:请求本地变量。


HystrixRequestVariable

它是一个接口,用于存储一个变量,和上下文相关。

第一个问题:HystrixRequestContext能装东西吗?那是必然呢,不然怎么好意思叫上下文呢?

代码语言:javascript
复制
HystrixRequestContext:

	// 上下文所有的数据都使用这个Map来存储
	// state的访问权限是default级别:只能同包访问
	ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<>();

这是实际的存储结构。每个线程绑定了一个HystrixRequestContext,而每个HystrixRequestContext有个Map结构存储数据,key就HystrixRequestVariableDefault

HystrixRequestContext并没有提供直接访问state的相关方法,因此字段并非private,它建议外部直接使用字段进行访问(添加数据、移除数据等)。而state字段的唯一访问处是HystrixRequestVariableDefault

代码语言:javascript
复制
// 它的效果特别像ThreadLocal:用于传递请求级别的数据
// HystrixRequestVariableLifecycle方法:initialValue/shutdown(value)
public interface HystrixRequestVariable<T> extends HystrixRequestVariableLifecycle<T> {

	// 获取**当前请求**的值
	public T get();
}

它有一个默认实现HystrixRequestVariableDefault(可以认为是唯一实现)。


HystrixRequestVariableDefault

它用于储存用户请求级别的变量数据,而实际存储依赖于HystrixRequestContext.state,和请求上下文实例绑定,从而和线程绑定。

代码语言:javascript
复制
public class HystrixRequestVariableDefault<T> implements HystrixRequestVariable<T> {
	
	// 获取和当前请求(注意:不是当前线程哦,因为自己可能是子线程或者)
	public T get() {
		... // 必须确保HystrixRequestContext#init工作已做 否则抛出异常
		ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
		LazyInitializer<?> v = variableMap.get(this);
        if (v != null) {
            return (T) v.get();
        }
		...
	}
	// 初始值默认是null
    public T initialValue() {
        return null;
    }

	// 设置数据:直接调用put方法即可。使用new一个新的LazyInitializer装起来
    public void set(T value) {
        HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
    }
    public void remove() { ... }
	...
}

它看起来和ThreadLocal效果类似,但是他俩有如下明显区别:

  1. 使用它之前,必须调用HystrixRequestContext#initializeContext完成初始化才行(原因是HystrixRequestContext.state才是底层存储)
  2. 它的清除动作是交给HystrixRequestContext#shutdown完成,所以请求结束后请你务必调用此方法

另外,它也说了,要想达到向子线程、线程池都可以传递数据的效果,你得使用Hystrix包装后的HystrixContextCallable/HystrixContextRunnable去初始化任务,才能达到预期的传播效果。


HystrixContextRunnable

Hystrix 的思路是包装Runnable,在执行实际的任务之前,先拿当前线程HystrixRequestContext初始化实际执行任务的线程HystrixRequestContext,这样达到了同一个HystrixRequestContext向下传递的目的了。

代码语言:javascript
复制
public class HystrixContextRunnable implements Runnable {

    private final Callable<Void> actual;
    // 父线程的上下文
    private final HystrixRequestContext parentThreadState;

    public HystrixContextRunnable(Runnable actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }
	// 这里使用了HystrixConcurrencyStrategy,它是一个SPI哦
	// 你还可以通过该SPI,自定义你的执行机制,非常靠谱有木有
    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        this.actual = concurrencyStrategy.wrapCallable(() -> {
        	actual.run(); // 实际执行的任务
        	return null;
        });
        // 父线程奶你**构造时**所处在的线程
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
    }


    @Override
    public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            
        } finally {
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }
}

处理思想和阿里巴巴的TTL如出一辙:执行前把父上下文设置进去,目标任务执行完成后,释放父上下文

另外:它还有个HystrixContextCallable用于包装Callable,逻辑一毛一样。


使用示例二:
代码语言:javascript
复制
@Test
public void fun3() throws InterruptedException {
    HystrixRequestContext.initializeContext();
    HystrixRequestContext mainContext = HystrixRequestContext.getContextForCurrentThread();
    // 设置变量:让其支持传递到子线程 or 线程池
    NAME_VARIABLE.set("YoutBatman");

    // 子线程的Runnable任务,必须使用`HystrixContextRunnable`才能得到上面设置的值哦
    new Thread(new HystrixContextRunnable(() -> {
        HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();
        System.out.println(contextForCurrentThread == mainContext);
        System.out.println("当前线程绑定的变量值是:" + NAME_VARIABLE.get());
    })).start();

	TimeUnit.SECONDS.sleep(1);
    HystrixRequestContext.getContextForCurrentThread().close();
    
}

运行程序,控制台输出:

代码语言:javascript
复制
true
当前线程绑定的变量值是:YoutBatman

请注意:这块代码并没有显示的将 YourBatman 从 main线程传递到子线程,也没有利用InheritableThreadLocal哦。它的执行步骤可描述如下:

  1. main初始化HystrixRequestContext,并且和此Context上下文完成绑定
  2. NAME_VARIABLE.set("YoutBatman")设置变量值,请注意:此变量值是处在HystrixRequestContext这个上下文里的哦,属于main线程的内容
  3. main线程初始化任务:使用的HystrixContextRunnable,所以该任务是和main线程的上下文绑定的
  4. 执行任务时,先用main线程的Context来初始化上下文(所以它绑定的上下文和main线程的是同一个上下文
  5. 任务里使用NAME_VARIABLE.get()实际上是从main线程的上下文里拿数据,那必然可以取到呀

这就能解释了:为何子线程(甚至是线程池里的线程)都能拿到父线程里设置的变量了,因为是共用的同一个context上下文嘛。


HystrixConcurrencyStrategy通用方案解决跨线程传值

代码语言:javascript
复制
HystrixConcurrencyStrategy:

	// 给我们一个机会:装饰callable执行
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }

也就是说我们可以自定义该插件的实现,可以在目标任务执行前后加入自己的逻辑。下面举一个常见例子,获取是你可能遇到的坑

代码语言:javascript
复制
DemoController:

	@GetMapping("/demo")
	public String getDemo() {
		// 简单的说:就是向Request域里放一个值
	    RequestContextHolder.currentRequestAttributes().setAttribute("name", "YourBatman", SCOPE_REQUEST);
	    return demoService.getInfo();
	}

DemoService:
	
	public String getInfo() {
		Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST);
		return name;
	}

本来这一切是可以正常work的,但是如果现在突然说DemoService#getInfo()要加入Hystrix熔断降级:

代码语言:javascript
复制
DemoService:
	
	@HystrixCommand( ... )
	public String getInfo() {
		Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST);
		return name;
	}

再次运行,what a fuck,返回null,直接导致逻辑错误,这非常致命。如果只有一个接口到时简单,你可以采用方法参数传递方式fix,但若有多个呢???下面接针对性的介绍一种通用解决方案:自定义HystrixConcurrencyStrategy

代码语言:javascript
复制
/**
 * 此类能够保证:RequestContext请求上下文,也就是RequestAttributes能够在线程池里自动有效
 */
public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    // 这个时候还在主线程了,所以通过RequestContextHolder.getRequestAttributes()是能拿到上下文的
    // 拿到后hold住,等到run执行的时候再绑定即可
    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.getRequestAttributes());
    }

    static class RequestAttributeAwareCallable<T> implements Callable<T> {

        private final Callable<T> delegate;
        private final RequestAttributes requestAttributes;

        public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {
            this.delegate = callable;
            this.requestAttributes = requestAttributes;
        }

        // 执行之前绑定上下文,执行完成后释放
        @Override
        public T call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return delegate.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

使用API方式注册此插件:

代码语言:javascript
复制
HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy());

你也可以采用配置的方式,写在plugin.properties/config.properties里:

代码语言:javascript
复制
hystrix.plugin.HystrixConcurrencyStrategy.implementation=com.yourbatman.hystrix.RequestContextHystrixConcurrencyStrategy

这样做后,妈妈就再也不用担心你获取不到请求上下文啦。另外本处只是以RequestContext为例,当然还有像MDC传值MDC.setContextMap(contextMap)等等方案都是一样的做。


总结

本文介绍了Netflix Hystrix它自己的解决跨线程传递数据的结局方式,并且也介绍了HystrixConcurrencyStrategy的扩展使用方式,该接口的扩展使用在Spring Cloud里也会有所体现。

总的来说,这篇文章内容和ThreadLocal的思想是相同的,或者说和阿里巴巴的TTL更是相似。Hystrix的隔离方式默认是线程池方式,所以加熔断后是有何能影响你的正常逻辑的,请务必小心谨慎使用,特别在全链路追踪时,很可能某些链路就失效了,影响你的全链路压测逻辑

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 前言
  • 正文
    • HystrixRequestContext
      • 使用示例一:
      • HystrixRequestVariable
    • HystrixContextRunnable
      • 使用示例二:
    • HystrixConcurrencyStrategy通用方案解决跨线程传值
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档