首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊HystrixEventNotifier

聊聊HystrixEventNotifier

作者头像
code4it
发布2018-09-17 17:01:26
6220
发布2018-09-17 17:01:26
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下HystrixEventNotifier

HystrixEventNotifier

/**
 * Abstract EventNotifier that allows receiving notifications for different events with default implementations.
 * <p>
 * See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
 * href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
 * <p>
 * <b>Note on thread-safety and performance</b>
 * <p>
 * A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe.
 * <p>
 * Methods are also invoked synchronously and will add to execution time of the commands so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously
 * onto separate worker threads.
 */
public abstract class HystrixEventNotifier {

    /**
     * Called for every event fired.
     * <p>
     * <b>Default Implementation: </b> Does nothing
     * 
     * @param eventType event type
     * @param key event key
     */
    public void markEvent(HystrixEventType eventType, HystrixCommandKey key) {
        // do nothing
    }

    /**
     * Called after a command is executed using thread isolation.
     * <p>
     * Will not get called if a command is rejected, short-circuited etc.
     * <p>
     * <b>Default Implementation: </b> Does nothing
     * 
     * @param key
     *            {@link HystrixCommandKey} of command instance.
     * @param isolationStrategy
     *            {@link ExecutionIsolationStrategy} the isolation strategy used by the command when executed
     * @param duration
     *            time in milliseconds of executing <code>run()</code> method
     * @param eventsDuringExecution
     *            {@code List<HystrixEventType>} of events occurred during execution.
     */
    public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List<HystrixEventType> eventsDuringExecution) {
        // do nothing
    }

}
  • 这个notifier是同步调用的,因此里头方法的实现不能太耗时,不然则会阻塞,如果方法太耗时则需要考虑异步到其他线程
  • markEvent,在每个事件触发的时候调用,markCommandExecution是在使用线程隔离方式的时候会调用

HystrixEventNotifierDefault

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/eventnotifier/HystrixEventNotifierDefault.java

/**
 * Default implementations of {@link HystrixEventNotifier} that does nothing.
 * 
 * @ExcludeFromJavadoc
 */
public class HystrixEventNotifierDefault extends HystrixEventNotifier {

    private static HystrixEventNotifierDefault INSTANCE = new HystrixEventNotifierDefault();

    private HystrixEventNotifierDefault() {

    }

    public static HystrixEventNotifier getInstance() {
        return INSTANCE;
    }
}

默认实现不做任何操作

HystrixEventType

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixEventType.java

/**
 * Various states/events that execution can result in or have tracked.
 * <p>
 * These are most often accessed via {@link HystrixRequestLog} or {@link HystrixCommand#getExecutionEvents()}.
 */
public enum HystrixEventType {
    EMIT(false),
    SUCCESS(true),
    FAILURE(false),
    TIMEOUT(false),
    BAD_REQUEST(true),
    SHORT_CIRCUITED(false),
    THREAD_POOL_REJECTED(false),
    SEMAPHORE_REJECTED(false),
    FALLBACK_EMIT(false),
    FALLBACK_SUCCESS(true),
    FALLBACK_FAILURE(true),
    FALLBACK_REJECTION(true),
    FALLBACK_MISSING(true),
    EXCEPTION_THROWN(false),
    RESPONSE_FROM_CACHE(true),
    CANCELLED(true),
    COLLAPSED(false),
    COMMAND_MAX_ACTIVE(false);

    private final boolean isTerminal;

    HystrixEventType(boolean isTerminal) {
        this.isTerminal = isTerminal;
    }

    public boolean isTerminal() {
        return isTerminal;
    }
    //......

    /**
     * List of events that throw an Exception to the caller
     */
    public final static List<HystrixEventType> EXCEPTION_PRODUCING_EVENT_TYPES = new ArrayList<HystrixEventType>();

    /**
     * List of events that are terminal
     */
    public final static List<HystrixEventType> TERMINAL_EVENT_TYPES = new ArrayList<HystrixEventType>();

    static {
        EXCEPTION_PRODUCING_EVENT_TYPES.add(BAD_REQUEST);
        EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_FAILURE);
        EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_MISSING);
        EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_REJECTION);

        for (HystrixEventType eventType: HystrixEventType.values()) {
            if (eventType.isTerminal()) {
                TERMINAL_EVENT_TYPES.add(eventType);
            }
        }
    }
    //......
}
  • 这里定了HystrixEvent的枚举,然后还对这些事件进行了分类,分为EXCEPTION_PRODUCING_EVENT_TYPES以及EXCEPTION_PRODUCING_EVENT_TYPES两类
  • EXCEPTION_PRODUCING_EVENT_TYPES包括BAD_REQUEST、FALLBACK_FAILURE、FALLBACK_MISSING、FALLBACK_REJECTION
  • TERMINAL_EVENT_TYPES主要是根据event枚举的isTerminal属性来,包括SUCCESS、BAD_REQUEST、FALLBACK_SUCCESS、FALLBACK_FAILURE、FALLBACK_REJECTION、FALLBACK_MISSING、RESPONSE_FROM_CACHE、CANCELLED

markCommandExecution

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

    /**
     * This decorates "Hystrix" functionality around the run() Observable.
     *
     * @return R
     */
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        final Action1<R> markEmits = new Action1<R>() {
            @Override
            public void call(R r) {
                if (shouldOutputOnNextEvents()) {
                    executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                    eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
                }
                if (commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

        final Action0 markOnCompleted = new Action0() {
            @Override
            public void call() {
                if (!commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
            @Override
            public void call(Notification<? super R> rNotification) {
                setRequestContextIfNeeded(currentRequestContext);
            }
        };

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
  • 这里在markEmits的action里头如果是scalar command,会调用markCommandExecution
  • 这里在markOnCompleted的action里头,如果不是scalar command,则会调用markCommandExecution

小结

在HystrixCommand以及HystrixObservableCommand调用的时候,都会调用HystrixEventNotifier来发布事件,提供给开发者自定义实现,来做指标收集及监控报警。

doc

  • Event Notifier
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HystrixEventNotifier
  • HystrixEventNotifierDefault
  • HystrixEventType
  • markCommandExecution
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档