前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊HystrixCommandExecutionHook

聊聊HystrixCommandExecutionHook

作者头像
code4it
发布2018-09-17 17:00:06
8740
发布2018-09-17 17:00:06
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下HystrixCommandExecutionHook

HystrixCommandExecutionHook

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHook.java

代码语言:javascript
复制
/**
 * Abstract ExecutionHook with invocations at different lifecycle points of {@link HystrixCommand}
 * and {@link HystrixObservableCommand} execution with default no-op implementations.
 * <p>
 * See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
 * href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
 * <p>
 * <b>Note on thread-safety and performance</b>
 * <p>
 * A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe.
 * <p>
 * Methods are also invoked synchronously and will add to execution time of the commands so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously
 * onto separate worker threads.
 * 
 * @since 1.2
 * */
public abstract class HystrixCommandExecutionHook {

    /**
     * Invoked before {@link HystrixInvokable} begins executing.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.2
     */
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param failureType {@link FailureType} enum representing which type of error
     * @param e exception object
     *
     * @since 1.2
     */
    public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
        return e; //by default, just pass through
    }

    /**
     * Invoked when {@link HystrixInvokable} finishes a successful execution.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked at start of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
     *
     * @param commandInstance The executing HystrixCommand instance.
     *
     * @since 1.2
     */
    public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
     * This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished
     * naturally, or was unsubscribed externally
     *
     * @param commandInstance The executing HystrixCommand instance.
     *
     * @since 1.2
     */
    public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
        // do nothing by default
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} starts.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param e exception object
     *
     * @since 1.4
     */
    public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
        return e; //by default, just pass through
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} starts.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.2
     */
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param e exception object
     *
     * @since 1.2
     */
    public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        //by default, just pass through
        return e;
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the command response is found in the {@link com.netflix.hystrix.HystrixRequestCache}.
     *
     * @param commandInstance The executing HystrixCommand
     *
     * @since 1.4
     */
    public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked with the command is unsubscribed before a terminal state
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.5.9
     */
    public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

}

主要规范了如下钩子方法:

  • onStart
  • onEmit
  • onError
  • onSuccess
  • onThreadStart
  • onThreadComplete
  • onExecutionStart
  • onExecutionEmit
  • onExecutionError
  • onExecutionSuccess
  • onFallbackStart
  • onFallbackEmit
  • onFallbackError
  • onFallbackSuccess
  • onCacheHit
  • onUnsubscribe

HystrixCommandExecutionHookDefault

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHookDefault.java

代码语言:javascript
复制
/**
 * Default implementations of {@link HystrixCommandExecutionHook} that does nothing.
 * 
 * @ExcludeFromJavadoc
 */
public class HystrixCommandExecutionHookDefault extends HystrixCommandExecutionHook {

    private static HystrixCommandExecutionHookDefault INSTANCE = new HystrixCommandExecutionHookDefault();

    private HystrixCommandExecutionHookDefault() {

    }

    public static HystrixCommandExecutionHook getInstance() {
        return INSTANCE;
    }

}

默认的实现并没有重新钩子方法

回调

在hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java这个类中,在相应的方法里头对executionHook进行了相应的回调,比如:

代码语言:javascript
复制
    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        //we have not been unsubscribed, so should proceed
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        /**
                         * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                         */
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                        //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                    }
                    //if it was unsubscribed, then other cleanup handled it
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                        //if it was never started and was cancelled, then no need to clean up
                    }
                    //if it was terminal, then other cleanup handled it
                }
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    // semaphore isolated
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (Throwable ex) {
                        //If the above hooks throw, then use that as the result of the run method
                        return Observable.error(ex);
                    }
                }
            });
        }
    }

这个方法回调了executionHook的onThreadStart、onRunStart、onExecutionStart方法。

小结

HystrixCommandExecutionHook提供了对HystrixCommand及HystrixObservableCommand生命周期的钩子方法,开发者可以自定义实现,做一些额外的处理,比如日志打印、覆盖response、更改线程状态等等。

doc

  • Command Execution Hook
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HystrixCommandExecutionHook
  • HystrixCommandExecutionHookDefault
  • 回调
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档