专栏首页aoho求索详解Hystrix资源隔离

详解Hystrix资源隔离

在货船中,为了防止漏水和火灾的扩散,一般会将货仓进行分割,避免了一个货仓出事导致整艘船沉没的悲剧。同样的,在Hystrix中,也采用了这样的舱壁模式,将系统中的服务提供者隔离起来,一个服务提供者延迟升高或者失败,并不会导致整个系统的失败,同时也能够控制调用这些服务的并发度。

线程与线程池

Hystrix中通过将调用服务线程与服务访问的执行线程分隔开来,调用线程能够空出来去做其他的工作而不至于被服务调用的执行的阻塞过长的时间。

在Hystrix中使用独立的线程池对应每一个服务提供者,来隔离和限制这些服务,于是,某个服务提供者的高延迟或者饱和资源受限只会发生在该服务提供者对用的线程池中。

如上图中,Dependency I的调用失败或者高延迟仅会影响自身对应的线程池中的5个线程的阻塞并不会影响其他服务提供者的线程池状况。系统完全与服务提供者请求隔离开来,即使服务提供者对应的线程完全耗尽,并不会影响系统中的其他请求。

注意在对应服务提供者的线程池被占满时,Hystrix会进入了fallback逻辑,快速失败,保护服务调用者的资源稳定。

信号量

除了线程池外,Hystrix还可以通过信号量(计数器)来限制单个服务提供者的并发量。如果通过信号量来控制系统负载,将不再允许设置超时和异步化,这就表示在服务提供者出现高延迟,其调用线程将会被阻塞,直至服务提供者的网络请求超时,如果对服务提供者有足够的信息,可以通过信号量来控制系统的负载。

Hystrix执行流程

简单的流程的序号介绍如下

  1. 构建HystrixCommand或者HystrixObservableCommand对象
  2. 执行命令
  3. 是否有Response缓存
  4. 是否断路器打开
  5. 是否线程池或者队列或者信号量被消耗完
  6. HystrixObservableCommand.construct() or HystrixCommand.run()
  7. 计算链路的健康情况
  8. 获取fallback逻辑
  9. 返回成功的Response

资源隔离实现

Hystrix在判断完断路器关行后(执行流程的第4步),将会尝试获取信号量(AbstractCommand#applyHystrixSemantics())中,在Hystrix中,主要有两种方式进行资源隔离操作,一种是通过信号量的隔离策略(ExecutionIsolationStrategy.SEMAPHORE),另一种是线程隔离的策略(ExecutionIsolationStrategy.THREAD),我们下面来关注一下相关的实现。

信号量隔离策略

信号量隔离主要通过TryableSemaphore接口实现:

 1interface TryableSemaphore {
 2
 3     // 尝试获取信号量
 4    public abstract boolean tryAcquire();
 5    // 释放信号量    
 6      public abstract void release();
 7    // 
 8      public abstract int getNumberOfPermitsUsed();
 9
10}

它的主要实现类主要有TryableSemaphoreNoOp,顾名思义,不进行信号量隔离,当采取线程隔离策略的时候将会注入该实现到HystrixCommand中,如果采用信号量的隔离策略时,将会注入TryableSemaphoreActual,但此时无法超时和异步化,因为信号量隔离资源的策略无法指定命令的在特定的线程执行,从而无法控制线程的执行结果。

TryableSemaphoreActual实现相当简单,通过AtomicInteger记录当前请求的信号量的线程数(原子操作保证数据的一致性),与初始化设置的允许最大信号量数进行比较numberOfPermits(可以动态调整),从而判断是否允许获取信号量,轻量级的实现,保证TryableSemaphoreActual无阻塞的操作方式。

 1static class TryableSemaphoreActual implements TryableSemaphore {
 2    protected final HystrixProperty<Integer> numberOfPermits;
 3       private final AtomicInteger count = new AtomicInteger(0);
 4
 5       public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
 6           this.numberOfPermits = numberOfPermits;
 7      }
 8
 9      @Override
10       public boolean tryAcquire() {
11          int currentCount = count.incrementAndGet();
12          if (currentCount > numberOfPermits.get()) {
13              count.decrementAndGet();
14             return false;
15          } else {
16            return true;
17          }
18    }
19
20       @Override
21       public void release() {
22          count.decrementAndGet();
23       }
24
25       @Override
26       public int getNumberOfPermitsUsed() {
27           return count.get();
28      }
29}

需要注意的是每一个TryableSemaphore通过CommandKeyHystrixCommand一一绑定,在AbstractCommand#getExecutionSemaphore()有体现:

 1protected TryableSemaphore getExecutionSemaphore() {
 2    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
 3           if (executionSemaphoreOverride == null) {
 4           TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
 5          if (_s == null) {
 6              executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
 7                 return executionSemaphorePerCircuit.get(commandKey.name());
 8            } else {
 9              return _s;
10             }
11          } else {
12               return executionSemaphoreOverride;
13         }
14       } else {
15         return TryableSemaphoreNoOp.DEFAULT;
16       }
17}

如果是采用信号量隔离的策略,将尝试从缓存中获取该CommandKey对应的TryableSemaphoreActual(缓存中不存在创建一个新的,并与CommandKey绑定放置到缓存中),否则返回TryableSemaphoreNoOp不进行信号量隔离。

线程隔离策略

AbstractCommand#executeCommandWithSpecifiedIsolation()的方法中,线程隔离策略与信号隔离策略的操作主要区别是将Observable的执行线程通过threadPool.getScheduler()进行了指定,我们先查看一下HystrixThreadPool的相关接口。

HystrixThreadPool是用来将HystrixCommand#run()(被HystrixCommand包装的代码)指定到隔离的线程中执行的。

 1public interface HystrixThreadPool {
 2
 3       // 获取线程池
 4   public ExecutorService getExecutor();
 5    // 获取线程调度器
 6   public Scheduler getScheduler();
 7    //
 8   public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
 9
10   // 标记一个命令已经开始执行 
11   public void markThreadExecution();
12
13   // 标记一个命令已经结束执行 
14   public void markThreadCompletion();
15
16   // 标记一个命令无法从线程池获取到线程
17   public void markThreadRejection();
18
19   // 线程池队列是否有空闲 
20   public boolean isQueueSpaceAvailable();
21
22 }

HystrixThreadPool是由HystrixThreadPool.Factory生成和管理的,是通过ThreadPoolKey(@HystrixCommandthreadPoolKey指定)与HystrixCommand进行绑定,它的默认实现为HystrixThreadPoolDefault,其内的线程池ThreadPoolExecutor是通过HystrixConcurrencyStrategy策略生成,生成方法如下:

 1// HystrixConcurrencyStrategy
 2public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
 3    final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
 4       final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
 5       final int dynamicCoreSize = threadPoolProperties.coreSize().get();
 6       final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
 7      final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
 8       final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
 9
10       if (allowMaximumSizeToDivergeFromCoreSize) {
11           final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
12         if (dynamicCoreSize > dynamicMaximumSize) {
13              return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
14          } else {
15            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
16          }
17       } else {
18        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
19       }
20}

如果允许配置的maximumSize生效的话(allowMaximumSizeToDivergeFromCoreSize为true),在coreSize小于maximumSize时,会创建一个线程最大值为maximumSize的线程池,但会在相对不活动期间返回多余的线程到系统。否则就只应用coreSize来定义线程池中线程的数量。dynamic**前缀说明这些配置都可以在运行时动态修改,如通过配置中心的方式。

接着我们重点关注HystrixThreadPoolDefault#getScheduler()方法,这是给rx的Observable进行线程绑定的提供调度器的核心方法:

 1@Override
 2public Scheduler getScheduler() {
 3    //默认在超时可中断线程
 4       return getScheduler(new Func0<Boolean>() {
 5           @Override
 6          public Boolean call() {
 7           return true;
 8          }
 9      });
10}
11@Override
12public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
13    touchConfig();
14      return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
15}

touchConfig()的方法中可以动态调整线程池线程大小、线程存活时间等线程池的关键配置,在配置中心存在的情况下可以动态设置。

HystrixContextScheduler是Hystrix对rx中Scheduler调度器的重写,主要为了实现在Observable未被订阅时,不获取线程执行命令,以及支持在命令执行过程中能够打断运行。

首先关注一下Scheduler中的相关类图:

在rx中,Scheduler将生成对应的WorkerObservable用于执行命令,由Worker具体负责相关执行线程的调度,ThreadPoolWorker是Hystrix自行实现的Worker,持有调度的核心方法:

 1@Override
 2public Subscription schedule(final Action0 action) {
 3    if (subscription.isUnsubscribed()) {
 4         return Subscriptions.unsubscribed();
 5       }
 6      ScheduledAction sa = new ScheduledAction(action);
 7       subscription.add(sa);
 8      sa.addParent(subscription);
 9      ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
10       FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
11      sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
12      return sa;
13}

在上述代码中,如果Observable没有订阅,那么将取消执行,此时还没有分配线程;如果已经被订阅,将会分配线程提交任务,此时如果线程池中的线程已被占满,就可能抛出RejectedExecutionException的异常,拒绝任务,引发失败回滚逻辑。同时添加一个FutureCompleterWithConfigurableInterrupt用于在任务已经提交的情况下取消任务时释放线程。

 1// FutureCompleterWithConfigurableInterrupt
 2@Override
 3public void unsubscribe() {
 4    executor.remove(f);
 5      if (shouldInterruptThread.call()) {
 6           f.cancel(true);
 7      } else {
 8         f.cancel(false);
 9      }
10}

取消任务的时候将从线程池中移除任务,释放线程,同时根据配置是否强制中断任务的执行。

通过线程隔离的方式,可以将调用线程与执行命令的线程分隔开来,避免了调用线程被阻塞,同时通过线程池的方式对每种Command并发线程数量的控制也避免了一种Command的阻塞影响到了系统的其他请求的情况,很好的保护了调用方的线程资源。

本文分享自微信公众号 - aoho求索(aohoBlog),作者:CANGWU

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-03-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 如何在 Spring 异步调用中传递上下文什么是异步调用?

    异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行。异步调用...

    aoho求索
  • 抖音、腾讯、阿里、美团春招服务端开发岗位硬核面试(二)

    在上一篇 文章中,我们分享了几大互联网公司面试的题目,本文就来详细分析面试题答案以及复习参考和整理的面试资料,小民同学的私藏珍品?。

    aoho求索
  • 深入RxJava2 源码解析(二)

    前一篇文章我们讲述到RxJava2 的内部设计模式与原理机制,包括观察者模式和装饰者模式,其本质上都是RxJava2的事件驱动,那么本篇文章将会讲到RxJava...

    aoho求索
  • 【小家Java】一次Java线程池误用(newFixedThreadPool)引发的线上血案和总结

    自从最近的某年某月某天起,线上服务开始变得不那么稳定(软病)。在高峰期,时常有几台机器的内存持续飙升,并且无法回收,导致服务不可用。

    YourBatman
  • 多线程实现方式 转

    进程是程序在处理机中的一次运行。一个进程既包括其所要执行的指令,也包括了执行指令所需的系统资源,不同进程所占用的系统资源相对独立。所以进程是重量级的任务,它们之...

    南郭先生
  • java 线程 Thread 使用介绍,包含wait(),notifyAll() 等函数使用介绍

    (原创,转载请说明出处!谢谢--https://cloud.tencent.com/developer/user/1148436/activities)  此文...

    林冠宏-指尖下的幽灵
  • 深读 JDK 源码丨Java Thread

    线程是系统资源分配的最小单位,它被包含在进程之中,是进程中的实际运作单位。JVM 允许应用程序同时运行、执行多个线程,每个线程都有优先权,具有较高优先级的线程优...

    码脑
  • BATJ面试必会之并发篇

    调用 Thread.sleep() 方法使线程进入限期等待状态时,常常用“使一个线程睡眠”进行描述。

    乔戈里
  • 深入理解多线程

    多线程是java中比较重要的一部分内容,使用多线程有许多的优点: - 提高应用程序的响应。对图形化界面更有意义,可增强用户体验。 - 程序需要实现一些需...

    栋先生
  • java基础thread——多线程的纷争(循序渐进)

    正在运行的程序,是系统进行资源分配和调用的独立单位。 每一个进程都有它自己的内存空间和系统资源。

    100000860378

扫码关注云+社区

领取腾讯云代金券