前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo中使用hystrix遇到ThreadLocal变量的上下文传递时问题分析

dubbo中使用hystrix遇到ThreadLocal变量的上下文传递时问题分析

作者头像
山行AI
发布2020-01-02 11:54:23
3.1K0
发布2020-01-02 11:54:23
举报
文章被收录于专栏:山行AI山行AI

项目中用到了 dubbo,由于需要满足进行日志分析和国际化的需求,要在 dubbo 的 consumer 和 provider 之间进行透传 traceId 和国际化变量标识,常用的方法是利用 dubbo 的 filter 这一 spi 拓展,在 filter 中利用 attachment 将变量在消费者和提供者上下文进行传递,正常的情况下这样处理是能满足需求的,但是当同时使用 hystrix 时情况就变得不一样了。

起初的配置

consumer 和 provider 的 META-INFO/dubbo 目录下的 com.alibaba.dubbo.rpc.Filter 文件中的配置为:

代码语言:javascript
复制
MDCFilter=com.xx.common.filter.MDCFilter
HystrixFilter=com.xx.common.filter.HystrixFilter
DubboExceptionFilter=com.xx.common.filter.DubboExceptionFilter

配置文件

consumer 端的为:

代码语言:javascript
复制
<dubbo:consumer filter="HystrixFilter,MDCFilter,DubboExceptionFilter,-exception" timeout="10000"
                    check="false"/>

provider 的为:

代码语言:javascript
复制
<dubbo:provider filter="MDCFilter,DubboExceptionFilter,-exception" delay="-1" timeout="10000" retries="0" />

filter 代码

MDCFilter

代码语言:javascript
复制
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER})
public class MDCFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(MDCFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        try {
            if (!invocation.getMethodName().startsWith("$")) {
                try {
                    String reqId = invocation.getAttachment(BaseGlobalConstants.TRACE_ID, "");
                    String envType = invocation.getAttachment(BaseGlobalConstants.ENV_TYPE, "");
                    if (StringUtils.isNotEmpty(envType)) {
                        MDC.put(BaseGlobalConstants.ENV_TYPE, envType);
                    } else {
                        envType = MDC.get(BaseGlobalConstants.ENV_TYPE);
                        if (envType == null){
                            //非浏览器或手机移动端的请求时
                            envType = EnvTypeEnums.CN_SIMPLE.getCode() + "";
                        }
                        if (invocation instanceof RpcInvocation) {
                            ((RpcInvocation) invocation).setAttachment(BaseGlobalConstants.ENV_TYPE, envType);
                        }
                    }
                    if (StringUtils.isNotEmpty(reqId)) {
                        MDC.put(BaseGlobalConstants.TRACE_ID, reqId);
                    } else {
                        reqId = MDC.get(BaseGlobalConstants.TRACE_ID);
                        if (invocation instanceof RpcInvocation) {
                            ((RpcInvocation) invocation).setAttachment(BaseGlobalConstants.TRACE_ID, reqId);
                        }
                    }
                } catch (RpcException e) {
                    throw e;
                } catch (Exception e) {
                    throw new RpcException(e.getMessage(), e);
                }
            }
            return invoker.invoke(invocation);
        } finally {
            //清理
            MDC.remove(BaseGlobalConstants.TRACE_ID);
            MDC.remove(BaseGlobalConstants.ENV_TYPE);
            RpcContext.getContext().clearAttachments();
        }
    }
}

在这里大家应该明白一点就是 MDC 是基于 ThreadLocal 的实现

HystrixFilter

代码语言:javascript
复制
@Activate(group = Constants.CONSUMER)
public class HystrixFilter implements Filter {

    @Override
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
        DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
        return command.execute();
    }

}

其中 DubboHystrixCommand 的代码如下:

代码语言:javascript
复制
public class DubboHystrixCommand extends HystrixCommand<Result> {

    private static final Logger LOGGER = LoggerFactory.getLogger(DubboHystrixCommand.class);

    private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;

    private Invoker invoker;

    private Invocation invocation;

    public DubboHystrixCommand(Invoker invoker, Invocation invocation) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
                        invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withCircuitBreakerRequestVolumeThreshold(300).withCircuitBreakerSleepWindowInMilliseconds(1000)// 熔断器中断请求1秒后会进入半打开状态,放部分流量过去重试
                        .withCircuitBreakerErrorThresholdPercentage(50)// 错误率达到50开启熔断保护
                        .withExecutionTimeoutEnabled(false))// 使用dubbo的超时,禁用这里的超时
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))
                                .withMaxQueueSize(1000).withQueueSizeRejectionThreshold(700)));// 线程池为30
        this.invoker = invoker;
        this.invocation = invocation;
    }

    /**
     * 获取线程池大小
     *
     * @param url
     * @return
     */
    private static int getThreadPoolCoreSize(URL url) {
        if (url != null) {
            int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);
            LOGGER.debug("======ThreadPoolCoreSize is:" + size);
            return size;
        }
        return DEFAULT_THREADPOOL_CORE_SIZE;
    }

    @Override
    protected Result run() throws Exception {
        return invoker.invoke(invocation);
    }

在这里大家应该明白一点就是这里使用的是 Hystrix 的线程池隔离策略,也就是使用一个独立的线程池来处理 dubbo rpc 调用,从而与其他的操作从线程上隔离起来,达到熔断降级的效果。

问题

在实际运行时,大多数情况下会出现 consumer 端放入的 traceId 和国际化变量在 provider 端取不到的情况。而且具体分析时发现,consumer 端放入 MDC 的环境变量在 consumer 端的 filter 中从 MDC 去取时都会有取不到的情况。将 HystrixFilter,MDCFilter 的顺序调换成 MDCFilter,HystrixFilter 后在 consumer 端的 filter 中去 MDC 中是能取到正确的值的,但是在 provider 端是取不到的。

分析

看了前面几篇关于 ThreadLocal 的文章后就可以发现,导致这个问题的原因也很简单,就是 Hystrix 的用于隔离的线程池引起的 ThreadLocal 变量传递异常。

关于解决方法,首先想到的是在 MDC 上处理,因为 TransmitThreadLocal 提供了针对 MDC 的一个依赖:

代码语言:javascript
复制
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>log4j2-ttl-thread-context-map</artifactId>
    <scope>runtime</scope>
</dependency>

实际上仔细研究了一下发现这个依赖主要是为了解决在执行前后的变量设置和清除工作做了一个增强,对当前的问题还是不能很好地解决,详情见:https://github.com/alibaba/transmittable-thread-local/issues/49

解决方式

既然 Hystrix 是采用的线程池隔离,那么我们就可以将线程包装成 TtlRunnable 或 TtlCallable 或者直接用 TtlExectors 来包装线程池来实现线程池条件下的 ThreadLocal 变量传递问题。为了达成这一点,我们就从源码上来探寻一下解决方案。

我们先来看一下 AbstractCommand 的构造方法:

代码语言:javascript
复制
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
        ...................
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }

这里我们关注下 initThreadPool 方法:

代码语言:javascript
复制
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixThreadPool
            //threadPoolKey 是线程池的标识,一般是用于标识对哪个业务进行的隔离
            return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
        } else {
            return fromConstructor;
        }
    }

继续往下看 HystrixThreadPool.Factory.getInstance 方法:

代码语言:javascript
复制
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
            // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
            String key = threadPoolKey.name();

            // this should find it for all but the first time
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
        }

这里的 threadPools 是一个 ConcurrentHashMap 结构,是以 HystrixThreadPoolKey 的 name 为键,以 HystrixThreadPoolDefault 为 value 的 map。

我们着重看一下 HystrixThreadPoolDefault 的构造:

代码语言:javascript
复制
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();
            this.queue = concurrencyStrategy.getBlockingQueue(queueSize);

            if (properties.getAllowMaximumSizeToDivergeFromCoreSize()) {
                this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                        concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
                        properties);
                this.threadPool = this.metrics.getThreadPool();
            } else {
                this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                        concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
                        properties);
                this.threadPool = this.metrics.getThreadPool();
            }

            /* strategy: HystrixMetricsPublisherThreadPool */
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
        }

这里我们主要关心线程池的构造部分主要在下面这段代码:

代码语言:javascript
复制
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue)

这个是根据 concurrencyStrategy 来获取实际的线程池的部分的代码,如下:

代码语言:javascript
复制
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        ThreadFactory threadFactory = null;
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            threadFactory = new ThreadFactory() {
                protected final AtomicInteger threadNumber = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }

            };
        } else {
            threadFactory = PlatformSpecific.getAppEngineThreadFactory();
        }

        final int dynamicCoreSize = corePoolSize.get();
        final int dynamicMaximumSize = maximumPoolSize.get();

        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        }
    }

原本想着重写一下这个 getThreadPool 方法,但是发现返回值为 ThreadPoolExecutor,而且内部有很多平台逻辑的判断,于是放弃这个方法。同时在 HystrixConcurrencyStrategy 类中还有下面这个方法:

代码语言:javascript
复制
/**
     * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
     * <p>
     * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Pass-thru that does no wrapping.
     *
     * @param callable
     *            {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
     * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
     */
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }

它的注释写得很清楚,就是用于装饰 Callable 的,用于注入附加的行为,比如从 ThreadLocal 中拷贝线程状态。这个就是我们所需要的。使用 idea 工具,findUsage 一下可以看到用到这个方法的主要有下面三个类:

  • HystrixContexSchedulerAction:
代码语言:javascript
复制
public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action0 action) {
        this.actual = action;
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();

        this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
                try {
                    // set the state of this thread to that of its parent
                    HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
                    // execute actual Action0 with the state of the parent
                    actual.call();
                    return null;
                } finally {
                    // restore this thread back to its original state
                    HystrixRequestContext.setContextOnCurrentThread(existingState);
                }
            }
        });
    }
  • HystrixContextCallable:
代码语言:javascript
复制
public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
        this.actual = concurrencyStrategy.wrapCallable(actual);
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
    }
  • HystrixContextRunnable:
代码语言:javascript
复制
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }

        });
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
    }

对于三者的用处暂时不去深究,我们接着来看在 HystrixThreadPoolDefault 内部的如下几个方法:

代码语言:javascript
复制
@Override
    public ThreadPoolExecutor getExecutor() {
        touchConfig();
        return threadPool;
    }

    @Override
    public Scheduler getScheduler() {
        //by default, interrupt underlying threads on timeout
        return getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return true;
            }
        });
    }

    @Override
    public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
        touchConfig();
        return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
    }

查看 getExecutor()的使用地方发现主要在 ThreadPoolWorker 中:

代码语言:javascript
复制
private static class ThreadPoolWorker extends Worker {

        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

        @Override
        public void unsubscribe() {
            subscription.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return subscription.isUnsubscribed();
        }

        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            // This is internal RxJava API but it is too useful.
            ScheduledAction sa = new ScheduledAction(action);

            subscription.add(sa);
            sa.addParent(subscription);

            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }
................

主要用于将 ScheduledAction 提交到线程池中处理,那么这个 schedule 会在哪里调用呢?我们继续查找,最后在 HystrixContextSchedulerWorker 中找到了它:

代码语言:javascript
复制
private class HystrixContextSchedulerWorker extends Worker {

        private final Worker worker;

        private HystrixContextSchedulerWorker(Worker actualWorker) {
            this.worker = actualWorker;
        }

        @Override
        public void unsubscribe() {
            worker.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return worker.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
        }

        @Override
        public Subscription schedule(Action0 action) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }

    }

worker.schedule 方法传入的是 HystrixContexSchedulerAction,就是我们上面提到的使用 wrapCallable 方法包装了一下 Callable 的类之一。

这时我们再回过头来看一下 com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation 方法:

这里 subscribeOn 返回的 Scheduler 是 HystrixContextScheduler,对应方法为 com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault#getScheduler(rx.functions.Func0):

代码语言:javascript
复制
@Override
    public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
        touchConfig();
        return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
    }

HystrixContextScheduler 的 createWorker 方法 com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler#createWorker 为:

代码语言:javascript
复制
@Override
    public Worker createWorker() {
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }

到这里整条线就都连起来了,HystrixContextScheduler 是 rxjava 中的 Scheduler 的实现,它提供的是 HystrixContextSchedulerWorker 来进行工作,HystrixContextSchedulerWorker 下面又调用了 getExecutor 方法来使用线程池进行实际的工作。而传入的需要执行的 Action 为 HystrixContexSchedulerAction,在这个 Action 的内部是使 com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#wrapCallable 方法进行包装过的。这样整个焦点就可以回归到 HystrixConcurrencyStrategy 的 wrapCallable 方法上来了,我们只需要在这里将这个 Callable 包装成 TtlCallable 就能解决问题了。

解决方案

一般 hystrix 修改策略的方式是通过配置文件来指定的,同时也可以使用硬编码的方式,配置文件的方式比较简单,这里就不再多描述了,简单介绍一下硬编码的方式。硬编码的方式主要参考:https://github.com/Netflix/Hystrix/wiki/Plugins,是通过HystrixPlugins.getInstance().registerConcurrencyStrategy(...)来进行的,但是需要注意的是如果这段代码在构造方法中初始化会报错,是因为HystrixCommand实现类中先调用了super的构造器,而且必须在第一行,在父构造器执行完成后ConcurrencyStrategy已经完成注册,而HystrixPlugins.getInstance().registerConcurrencyStrategy方法内部有null值的cas检查,要求必须是没有初始化过的才能设置,详见代码:

代码语言:javascript
复制
public void registerConcurrencyStrategy(HystrixConcurrencyStrategy impl) {
        if (!concurrencyStrategy.compareAndSet(null, impl)) {
            throw new IllegalStateException("Another strategy was already registered.");
        }
    }

所以这里需要先将已经注册过的清空掉,然后全部重新注册。而 com.netflix.hystrix.strategy.HystrixPlugins#reset 方法正是用于处理这件事情的,代码如下:

代码语言:javascript
复制
public static void reset() {
        getInstance().notifier.set(null);
        getInstance().concurrencyStrategy.set(null);
        getInstance().metricsPublisher.set(null);
        getInstance().propertiesFactory.set(null);
        getInstance().commandExecutionHook.set(null);
        HystrixMetricsPublisherFactory.reset();
    }

可以看到,这里清空的不止 ConcurrencyStrategy,还有其他组件,所以我们需要在清空之前先将其他的组件都预先取出,然后再重新注册,关于这个思路来自:https://blog.csdn.net/songhaifengshuaige/article/details/80345012。

最终代码

DubboHystrixCommand

代码语言:javascript
复制
public class DubboHystrixCommand extends HystrixCommand<Result> {

    private static final Logger LOGGER = LoggerFactory.getLogger(DubboHystrixCommand.class);

    private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;

    private Invoker invoker;

    private Invocation invocation;

    public DubboHystrixCommand(Invoker invoker, Invocation invocation) {
        //https://blog.csdn.net/songhaifengshuaige/article/details/80345012
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
                        invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withCircuitBreakerRequestVolumeThreshold(300).withCircuitBreakerSleepWindowInMilliseconds(1000)// 熔断器中断请求1秒后会进入半打开状态,放部分流量过去重试
                        .withCircuitBreakerErrorThresholdPercentage(50)// 错误率达到50开启熔断保护
                        .withExecutionTimeoutEnabled(false))// 使用dubbo的超时,禁用这里的超时
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))
                                .withMaxQueueSize(1000).withQueueSizeRejectionThreshold(700)));// 线程池为30
        //@see https://github.com/Netflix/Hystrix/wiki/Plugins
        HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        //HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
        HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
        HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
        HystrixPlugins.reset();
        HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
        HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
        HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
        HystrixPlugins.getInstance().registerConcurrencyStrategy(new TransmitHystrixConcurrencyStragegy());
        this.invoker = invoker;
        this.invocation = invocation;
    }
...................

TransmitHystrixConcurrencyStragegy

代码语言:javascript
复制
public class TransmitHystrixConcurrencyStragegy extends HystrixConcurrencyStrategy {

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        Callable<T> tCallable = super.wrapCallable(callable);
        return TtlCallable.get(tCallable);
    }

另外不要忘记引入 TransmitThreadLocal 的依赖。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 起初的配置
    • consumer 和 provider 的 META-INFO/dubbo 目录下的 com.alibaba.dubbo.rpc.Filter 文件中的配置为:
      • 配置文件
        • consumer 端的为:
        • provider 的为:
      • filter 代码
        • MDCFilter
        • HystrixFilter
    • 问题
    • 分析
    • 解决方式
    • 解决方案
    • 最终代码
      • DubboHystrixCommand
        • TransmitHystrixConcurrencyStragegy
        相关产品与服务
        Elasticsearch Service
        腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档