专栏首页码匠的流水账聊聊hystrix的timeout处理

聊聊hystrix的timeout处理

本文主要研究一下hystrix的timeout处理

HystrixObservableTimeoutOperator

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

    private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }

        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);

            //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
            final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

            TimerListener listener = new TimerListener() {

                @Override
                public void tick() {
                    // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
                    // otherwise it means we lost a race and the run() execution completed or did not start
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        // shut down the original request
                        s.unsubscribe();

                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });

                        timeoutRunnable.run();
                        //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
                    }
                }

                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);

            /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
            Subscriber<R> parent = new Subscriber<R>() {

                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }

                @Override
                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }

                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }

            };

            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);

            return parent;
        }

    }

这里有个timerListener去将isCommandTimedOut属性从TimedOutStatus.NOT_EXECUTED改为TimedOutStatus.TIMED_OUT

timeoutRunnable

final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });

                        timeoutRunnable.run();

这里如果设置超时状态成功的话,则onError抛出HystrixTimeoutException异常。

TimerListener

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.java

    /**
     * Add a {@link TimerListener} that will be executed until it is garbage collected or removed by clearing the returned {@link Reference}.
     * <p>
     * NOTE: It is the responsibility of code that adds a listener via this method to clear this listener when completed.
     * <p>
     * <blockquote>
     * 
     * <pre> {@code
     * // add a TimerListener 
     * Reference<TimerListener> listener = HystrixTimer.getInstance().addTimerListener(listenerImpl);
     * 
     * // sometime later, often in a thread shutdown, request cleanup, servlet filter or something similar the listener must be shutdown via the clear() method
     * listener.clear();
     * }</pre>
     * </blockquote>
     * 
     * 
     * @param listener
     *            TimerListener implementation that will be triggered according to its <code>getIntervalTimeInMilliseconds()</code> method implementation.
     * @return reference to the TimerListener that allows cleanup via the <code>clear()</code> method
     */
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }

这个TimerListener是通过ScheduledThreadPoolExecutor的scheduleAtFixedRate来调度的

timeoutTimer的清理

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

   private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        final long latency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult
                .addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency)
                .setNotExecutedInThread();
        ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency);
        metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted);
        eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
    }

    private void handleCommandEnd(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
        if (executionResultAtTimeOfCancellation == null) {
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
        } else {
            metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
        }

        if (endCurrentThreadExecutingCommand != null) {
            endCurrentThreadExecutingCommand.call();
        }
    }

cleanUpAfterResponseFromCache以及handleCommandEnd会清理掉这个timeoutTimer

TimerReference.clear

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.java

    private static class TimerReference extends SoftReference<TimerListener> {

        private final ScheduledFuture<?> f;

        TimerReference(TimerListener referent, ScheduledFuture<?> f) {
            super(referent);
            this.f = f;
        }

        @Override
        public void clear() {
            super.clear();
            // stop this ScheduledFuture from any further executions
            f.cancel(false);
        }

    }

TimerReference的clear方法里头,除了调用父类的clear方法外,还调用了ScheduledFuture的cancel(false)方法,这样子来取消掉线程的调度

小结

hystrix的timeout处理是通过添加一个TimeoutListener来进行调度处理的,调度是采用线程池的scheduleAtFixedRate方式调度的(executionTimeoutInMilliseconds之后执行),调度执行的是listener的tick方法。该方法会去设置isCommandTimedOut,从TimedOutStatus.NOT_EXECUTED改为TimedOutStatus.TIMED_OUT,如果成功则触发timeoutRunnable方法,抛出HystrixTimeoutException异常。

doc

  • Hystrix/wiki/How-it-Works

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-06-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊artemis message的duplicateProperty

    activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/ar...

    codecraft
  • 聊聊artemis message的duplicateProperty

    activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/ar...

    codecraft
  • 聊聊hystrix的execution.isolation.semaphore.maxConcurrentRequests属性

    本文主要研究一下hystrix的execution.isolation.semaphore.maxConcurrentRequests属性

    codecraft
  • 4.1.java8新特性持续更新

    记得我在以前找工作的经历中,遇到过一个面试官问过我一个很基础的问题。问题是:有一个List中有10个元素,我现在想从中删除3个元素,请问怎么做?我...

    itjim
  • Android数据库操作工具类分享

    本文实例为大家分享了Android数据库操作工具类的具体代码,供大家参考,具体内容如下

    砸漏
  • 减少那该死的像金字塔般的 if else 嵌套

    我并没夸大其词,我是真的遇到过了!嵌套6、7层,一个函数几百行,简!直!看!死!人!

    java进阶架构师
  • 当我看到一堆if else时,我的内心是奔溃的

    我并没夸大其词,我是真的遇到过了!嵌套6、7层,一个函数几百行,简!直!看!死!人!

    黄泽杰
  • 减少该死的 if else 嵌套

    我并没夸大其词,我是真的遇到过了!嵌套6、7层,一个函数几百行,简!直!看!死!人!

    Java团长
  • Python可视化库Matplotlib的使用

    LhWorld哥陪你聊算法
  • LeetCode Weekly Contest 35解题思路

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.n...

    用户1147447

扫码关注云+社区

领取腾讯云代金券