不要老想着做不顺就放弃,哪个团队都有问题,哪个团队都有优点 代码下载地址:https://github.com/f641385712/netflix-learning
说到线程间的数据传递,你肯定会联想到ThreadLocal
。但若是跨线程传递数据呢?阅读本文之前,我个人建议你已经能够非常熟练的使用ThreadLocal
并了解其基本原理,至少看过下面两篇文章表述的内容:
在Hystrix
里,它支持两种隔离模式:线程池隔离和信号量隔离。前者是默认选项:每个命令都在线程池里隔离执行,因此必然会涉及到存在跨线程传递数据的问题,这是Hystrix需要解决的(信号量隔离不存在此问题~)。
关于线程内、线程间传递数据的进阶三部曲:ThreadLocal -> InheritableThreadLocal -> TransmittableThreadLocal
,提供的能力越来越强。强两者由JDK源生提供,最多能支持到父线程向子线程传递数据,但无法解决线程池执行问题。
TransmittableThreadLocal
是阿里巴巴开源的TTL工具,专用于解决线程间数据传递问题,支持线程池。但是很显然,Hystrix不可能依赖于阿里巴巴的实现(阿里的年纪比它还小呢),所以它拥有自己的实现方式,核心API为:HystrixRequestContext/HystrixRequestVariableDefault/HystrixContextRunnable
。
它是请求级别的上下文,也就是说同一个请求内,前面放进去的内容,只要在请求生命周期内,任何地方均可以获取到。
说明:这里所指的请求级别,你可以理解为一个Http请求一样,只是针对一个请求:请求内的任务可用多个线程或者使用线程池去处理,所以需要解决线程池内的数据获取问题
public class HystrixRequestContext implements Closeable {
// 利用ThreadLocal, 每个线程各有一份HystrixRequestContext
// 当然,前提是调用了initializeContext()进行初始化
private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<>();
public static HystrixRequestContext initializeContext() {
HystrixRequestContext state = new HystrixRequestContext();
requestVariables.set(state); // 每个线程都持有一份当前上下文state
return state;
}
// 当然喽,它也允许你自己重新设置绑定
// 为当前线程设置一个已存在的HystrixRequestContext
public static void setContextOnCurrentThread(HystrixRequestContext state) {
requestVariables.set(state);
}
// 得到当前线程的Hystrix请求上下文 从ThreadLocal里拿
public static HystrixRequestContext getContextForCurrentThread() {
HystrixRequestContext context = requestVariables.get();
if (context != null && context.state != null) {
return context;
} else {
return null;
}
}
// 判断当前线程的上下文是否已经初始化~~~~ 就是判断get() == null而已嘛
public static boolean isCurrentThreadInitialized() {
HystrixRequestContext context = requestVariables.get();
return context != null && context.state != null;
}
}
这段代码看似做的不多:给当前线程绑定一个HystrixRequestContext
上下文对象,当然喽,这个上下文对象可以init初始化,也可来自于一个已经存在的上下文。
如何理解已经存在的上下文,比如父线程已经存在一个上下文,子线程想绑定时,是可以使用父上下文的,这样父子不就共用一个上下文了麽?这样数据就打通了嘛~
@Test
public void fun1() {
HystrixRequestContext.initializeContext();
... // 处理你的业务逻辑
HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();
System.out.println(contextForCurrentThread.getClass());
// contextForCurrentThread.close();
contextForCurrentThread.shutdown();
}
运行程序控制台打印:
class com.netflix.hystrix.strategy.concurrency.HystrixRequestContext
是的,单独使用HystrixRequestContext
的话,你就只能这么简单用啦。若配上Servlet的Filter,会更有意义些:
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
官方特别强调:如果你调用了
initializeContext()
方法,请务必确保shutdown()
方法也会被调用,否则会造成可能的内存泄漏。
这样子在Http请求的任意地方,便可轻松的拿到当前线程的HystrixRequestContext
对象,从而获取相应数据。
但是,请继续看如下示例:
@Test
public void fun2() throws InterruptedException {
HystrixRequestContext.initializeContext();
// 启动子线程完成具体业务逻辑
new Thread(() -> {
// 子线程里需要拿到请求上下文处理逻辑
HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();
// ... // 处理业务逻辑
System.out.println("当前Hystrix请求上下文是:" + contextForCurrentThread);
}).start();
HystrixRequestContext.getContextForCurrentThread().close();
TimeUnit.SECONDS.sleep(1);
}
控制台打印:
当前Hystrix请求上下文是:null
我相信你在看了文首推荐的两篇文章后,对这个结果并不会感到惊讶。有的人会说使用InheritableThreadLocal
能解决向子线程传递数据的问题,那么问题是如果异步任务交给线程池执行呢?难道你还得借助阿里巴巴
的TTL来实现吗?
我们知道Hystrix
默认的隔离方式是线程池:每个command/方法均会在线程池内执行,所以传递数据使用InheritableThreadLocal
是不能从根本上解决问题的,当然若你说借助阿里巴巴
的TTL是可以解决问题,但让堂堂Hystrix依赖阿里巴巴的库,是不是也太不实际了呢?况且Hystrix的年龄可比TTL大多了~
因此,对于以上示例至少可以提出以下疑问:
HystrixRequestContext
貌似没有任何意义,上下文里面可以装数据吗?HystrixRequestContext
,Hystrix是如何解决的???HystrixRequestContext
的Javadoc里有说,它还负责一件事:管理着HystrixRequestVariable
的生命周期(存储和销毁),而一个HystrixRequestVariable
接口便代表着请求变量:请求本地变量。
它是一个接口,用于存储一个变量,和上下文相关。
第一个问题:HystrixRequestContext
能装东西吗?那是必然呢,不然怎么好意思叫上下文呢?
HystrixRequestContext:
// 上下文所有的数据都使用这个Map来存储
// state的访问权限是default级别:只能同包访问
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<>();
这是实际的存储结构。每个线程绑定了一个HystrixRequestContext
,而每个HystrixRequestContext
有个Map结构存储数据,key就HystrixRequestVariableDefault
。
HystrixRequestContext
并没有提供直接访问state
的相关方法,因此字段并非private,它建议外部直接使用字段进行访问(添加数据、移除数据等)。而state
字段的唯一访问处是HystrixRequestVariableDefault
。
// 它的效果特别像ThreadLocal:用于传递请求级别的数据
// HystrixRequestVariableLifecycle方法:initialValue/shutdown(value)
public interface HystrixRequestVariable<T> extends HystrixRequestVariableLifecycle<T> {
// 获取**当前请求**的值
public T get();
}
它有一个默认实现HystrixRequestVariableDefault
(可以认为是唯一实现)。
它用于储存用户请求级别的变量数据,而实际存储依赖于HystrixRequestContext.state
,和请求上下文实例绑定,从而和线程绑定。
public class HystrixRequestVariableDefault<T> implements HystrixRequestVariable<T> {
// 获取和当前请求(注意:不是当前线程哦,因为自己可能是子线程或者)
public T get() {
... // 必须确保HystrixRequestContext#init工作已做 否则抛出异常
ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
LazyInitializer<?> v = variableMap.get(this);
if (v != null) {
return (T) v.get();
}
...
}
// 初始值默认是null
public T initialValue() {
return null;
}
// 设置数据:直接调用put方法即可。使用new一个新的LazyInitializer装起来
public void set(T value) {
HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
}
public void remove() { ... }
...
}
它看起来和ThreadLocal
效果类似,但是他俩有如下明显区别:
HystrixRequestContext#initializeContext
完成初始化才行(原因是HystrixRequestContext.state
才是底层存储)HystrixRequestContext#shutdown
完成,所以请求结束后请你务必调用此方法另外,它也说了,要想达到向子线程、线程池都可以传递数据的效果,你得使用Hystrix包装后的HystrixContextCallable/HystrixContextRunnable
去初始化任务,才能达到预期的传播效果。
Hystrix 的思路是包装Runnable,在执行实际的任务之前,先拿当前线程的HystrixRequestContext
初始化实际执行任务的线程的HystrixRequestContext
,这样达到了同一个HystrixRequestContext
向下传递的目的了。
public class HystrixContextRunnable implements Runnable {
private final Callable<Void> actual;
// 父线程的上下文
private final HystrixRequestContext parentThreadState;
public HystrixContextRunnable(Runnable actual) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
}
// 这里使用了HystrixConcurrencyStrategy,它是一个SPI哦
// 你还可以通过该SPI,自定义你的执行机制,非常靠谱有木有
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
this.actual = concurrencyStrategy.wrapCallable(() -> {
actual.run(); // 实际执行的任务
return null;
});
// 父线程奶你**构造时**所处在的线程
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}
@Override
public void run() {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
}
处理思想和阿里巴巴的TTL如出一辙:执行前把父上下文设置进去,目标任务执行完成后,释放父上下文。
另外:它还有个
HystrixContextCallable
用于包装Callable,逻辑一毛一样。
@Test
public void fun3() throws InterruptedException {
HystrixRequestContext.initializeContext();
HystrixRequestContext mainContext = HystrixRequestContext.getContextForCurrentThread();
// 设置变量:让其支持传递到子线程 or 线程池
NAME_VARIABLE.set("YoutBatman");
// 子线程的Runnable任务,必须使用`HystrixContextRunnable`才能得到上面设置的值哦
new Thread(new HystrixContextRunnable(() -> {
HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();
System.out.println(contextForCurrentThread == mainContext);
System.out.println("当前线程绑定的变量值是:" + NAME_VARIABLE.get());
})).start();
TimeUnit.SECONDS.sleep(1);
HystrixRequestContext.getContextForCurrentThread().close();
}
运行程序,控制台输出:
true
当前线程绑定的变量值是:YoutBatman
请注意:这块代码并没有显示的将 YourBatman 从 main线程传递到子线程,也没有利用InheritableThreadLocal
哦。它的执行步骤可描述如下:
HystrixRequestContext
,并且和此Context上下文完成绑定NAME_VARIABLE.set("YoutBatman")
设置变量值,请注意:此变量值是处在HystrixRequestContext
这个上下文里的哦,属于main线程的内容HystrixContextRunnable
,所以该任务是和main线程的上下文绑定的NAME_VARIABLE.get()
实际上是从main线程的上下文里拿数据,那必然可以取到呀这就能解释了:为何子线程(甚至是线程池里的线程)都能拿到父线程里设置的变量了,因为是共用的同一个context上下文嘛。
HystrixConcurrencyStrategy:
// 给我们一个机会:装饰callable执行
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}
也就是说我们可以自定义该插件的实现,可以在目标任务执行前后加入自己的逻辑。下面举一个常见例子,获取是你可能遇到的坑
DemoController:
@GetMapping("/demo")
public String getDemo() {
// 简单的说:就是向Request域里放一个值
RequestContextHolder.currentRequestAttributes().setAttribute("name", "YourBatman", SCOPE_REQUEST);
return demoService.getInfo();
}
DemoService:
public String getInfo() {
Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST);
return name;
}
本来这一切是可以正常work的,但是如果现在突然说DemoService#getInfo()
要加入Hystrix熔断降级:
DemoService:
@HystrixCommand( ... )
public String getInfo() {
Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST);
return name;
}
再次运行,what a fuck,返回null,直接导致逻辑错误,这非常致命。如果只有一个接口到时简单,你可以采用方法参数传递方式fix,但若有多个呢???下面接针对性的介绍一种通用解决方案:自定义HystrixConcurrencyStrategy
/**
* 此类能够保证:RequestContext请求上下文,也就是RequestAttributes能够在线程池里自动有效
*/
public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
// 这个时候还在主线程了,所以通过RequestContextHolder.getRequestAttributes()是能拿到上下文的
// 拿到后hold住,等到run执行的时候再绑定即可
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.getRequestAttributes());
}
static class RequestAttributeAwareCallable<T> implements Callable<T> {
private final Callable<T> delegate;
private final RequestAttributes requestAttributes;
public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {
this.delegate = callable;
this.requestAttributes = requestAttributes;
}
// 执行之前绑定上下文,执行完成后释放
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return delegate.call();
} finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
使用API方式注册此插件:
HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy());
你也可以采用配置的方式,写在plugin.properties/config.properties
里:
hystrix.plugin.HystrixConcurrencyStrategy.implementation=com.yourbatman.hystrix.RequestContextHystrixConcurrencyStrategy
这样做后,妈妈就再也不用担心你获取不到请求上下文啦。另外本处只是以RequestContext
为例,当然还有像MDC传值MDC.setContextMap(contextMap)
等等方案都是一样的做。
本文介绍了Netflix Hystrix
它自己的解决跨线程传递数据的结局方式,并且也介绍了HystrixConcurrencyStrategy
的扩展使用方式,该接口的扩展使用在Spring Cloud里也会有所体现。
总的来说,这篇文章内容和ThreadLocal
的思想是相同的,或者说和阿里巴巴的TTL更是相似。Hystrix
的隔离方式默认是线程池方式,所以加熔断后是有何能影响你的正常逻辑的,请务必小心谨慎使用,特别在全链路追踪时,很可能某些链路就失效了,影响你的全链路压测逻辑…