聊聊hystrix的timeout处理

本文主要研究一下hystrix的timeout处理

HystrixObservableTimeoutOperator

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

    private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }

        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);

            //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
            final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

            TimerListener listener = new TimerListener() {

                @Override
                public void tick() {
                    // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
                    // otherwise it means we lost a race and the run() execution completed or did not start
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        // shut down the original request
                        s.unsubscribe();

                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });

                        timeoutRunnable.run();
                        //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
                    }
                }

                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);

            /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
            Subscriber<R> parent = new Subscriber<R>() {

                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }

                @Override
                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }

                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }

            };

            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);

            return parent;
        }

    }

这里有个timerListener去将isCommandTimedOut属性从TimedOutStatus.NOT_EXECUTED改为TimedOutStatus.TIMED_OUT

timeoutRunnable

final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });

                        timeoutRunnable.run();

这里如果设置超时状态成功的话,则onError抛出HystrixTimeoutException异常。

TimerListener

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.java

    /**
     * Add a {@link TimerListener} that will be executed until it is garbage collected or removed by clearing the returned {@link Reference}.
     * <p>
     * NOTE: It is the responsibility of code that adds a listener via this method to clear this listener when completed.
     * <p>
     * <blockquote>
     * 
     * <pre> {@code
     * // add a TimerListener 
     * Reference<TimerListener> listener = HystrixTimer.getInstance().addTimerListener(listenerImpl);
     * 
     * // sometime later, often in a thread shutdown, request cleanup, servlet filter or something similar the listener must be shutdown via the clear() method
     * listener.clear();
     * }</pre>
     * </blockquote>
     * 
     * 
     * @param listener
     *            TimerListener implementation that will be triggered according to its <code>getIntervalTimeInMilliseconds()</code> method implementation.
     * @return reference to the TimerListener that allows cleanup via the <code>clear()</code> method
     */
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }

这个TimerListener是通过ScheduledThreadPoolExecutor的scheduleAtFixedRate来调度的

timeoutTimer的清理

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

   private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        final long latency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult
                .addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency)
                .setNotExecutedInThread();
        ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency);
        metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted);
        eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
    }

    private void handleCommandEnd(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
        if (executionResultAtTimeOfCancellation == null) {
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
        } else {
            metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
        }

        if (endCurrentThreadExecutingCommand != null) {
            endCurrentThreadExecutingCommand.call();
        }
    }

cleanUpAfterResponseFromCache以及handleCommandEnd会清理掉这个timeoutTimer

TimerReference.clear

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.java

    private static class TimerReference extends SoftReference<TimerListener> {

        private final ScheduledFuture<?> f;

        TimerReference(TimerListener referent, ScheduledFuture<?> f) {
            super(referent);
            this.f = f;
        }

        @Override
        public void clear() {
            super.clear();
            // stop this ScheduledFuture from any further executions
            f.cancel(false);
        }

    }

TimerReference的clear方法里头,除了调用父类的clear方法外,还调用了ScheduledFuture的cancel(false)方法,这样子来取消掉线程的调度

小结

hystrix的timeout处理是通过添加一个TimeoutListener来进行调度处理的,调度是采用线程池的scheduleAtFixedRate方式调度的(executionTimeoutInMilliseconds之后执行),调度执行的是listener的tick方法。该方法会去设置isCommandTimedOut,从TimedOutStatus.NOT_EXECUTED改为TimedOutStatus.TIMED_OUT,如果成功则触发timeoutRunnable方法,抛出HystrixTimeoutException异常。

doc

  • Hystrix/wiki/How-it-Works

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-06-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏mukekeheart的iOS之旅

No.013 Roman to Integer

13. Roman to Integer Total Accepted: 95998 Total Submissions: 234087 Difficulty:...

24950
来自专栏ACM小冰成长之路

DES算法-C/C++实现

闲来无事,车一下轮子,折腾了大半天才搞懂 DESDES 是干毛子的,看了好多博客才弄清楚这个算法的具体原理,真是心累。 只是简单的实现,功能比较简陋,因为参考的...

511100
来自专栏码匠的流水账

聊聊rocketmq的PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

19720
来自专栏Java与Android技术栈

Transformer 在RxJava中的使用

Transformer,顾名思义是转换器的意思。早在 RxJava1.x 版本就有了Observable.Transformer、Single.Transfor...

95420
来自专栏Java与Android技术栈

Scrypt 不止是加密算法,也是莱特币的挖矿算法

Scrypt不仅计算所需时间长,而且占用的内存也多,使得并行计算多个摘要异常困难,因此利用rainbow table进行暴力攻击更加困难。Scrypt 没有在生...

14740
来自专栏一个会写诗的程序员的博客

【Kotlin 反应式编程】第1讲 你好,Reactive Programming

【Kotlin 反应式编程】第1讲 你好,Reactive Programming

9120
来自专栏码匠的流水账

聊聊spring-data-redis的连接池的校验

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/o...

70410
来自专栏函数式编程语言及工具

SDP(2):ScalikeJDBC-Connection Pool Configuration

  scalikeJDBC可以通过配置文件来设置连接池及全局系统参数。对配置文件的解析是通过TypesafeConfig工具库实现的。默认加载classpath...

36140
来自专栏码匠的流水账

tomcat如何关闭response的outputStream

在写文件下载的时候,遇到了一个问题,就是这个ServletOutputStream到底要不要自己flush以及close。这里以tomcat容易为例,解读一下。

16210
来自专栏算法修养

Lucene.net(4.8.0) 学习问题记录一:分词器Analyzer的构造和内部成员ReuseStategy

前言:目前自己在做使用Lucene.net和PanGu分词实现全文检索的工作,不过自己是把别人做好的项目进行迁移。因为项目整体要迁移到ASP.NET Core ...

35250

扫码关注云+社区

领取腾讯云代金券