聊聊resilience4j的CircuitBreakerStateMachine

本文主要研究一下resilience4j的CircuitBreakerStateMachine

CircuitBreakerStateMachine

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java

/**
 * A CircuitBreaker finite state machine.
 */
public final class CircuitBreakerStateMachine implements CircuitBreaker {

    private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakerStateMachine.class);

    private final String name;
    private final AtomicReference<CircuitBreakerState> stateReference;
    private final CircuitBreakerConfig circuitBreakerConfig;
    private final CircuitBreakerEventProcessor eventProcessor;

    /**
     * Creates a circuitBreaker.
     *
     * @param name                 the name of the CircuitBreaker
     * @param circuitBreakerConfig The CircuitBreaker configuration.
     */
    public CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig) {
        this.name = name;
        this.circuitBreakerConfig = circuitBreakerConfig;
        this.stateReference = new AtomicReference<>(new ClosedState(this));
        this.eventProcessor = new CircuitBreakerEventProcessor();
    }

    /**
     * Creates a circuitBreaker with default config.
     *
     * @param name the name of the CircuitBreaker
     */
    public CircuitBreakerStateMachine(String name) {
        this(name, CircuitBreakerConfig.ofDefaults());
    }

    /**
     * Creates a circuitBreaker.
     *
     * @param name                 the name of the CircuitBreaker
     * @param circuitBreakerConfig The CircuitBreaker configuration supplier.
     */
    public CircuitBreakerStateMachine(String name, Supplier<CircuitBreakerConfig> circuitBreakerConfig) {
        this(name, circuitBreakerConfig.get());
    }

    /**
     * Requests permission to call this backend.
     *
     * @return true, if the call is allowed.
     */
    @Override
    public boolean isCallPermitted() {
        boolean callPermitted = stateReference.get().isCallPermitted();
        if (!callPermitted) {
            publishCallNotPermittedEvent();
        }
        return callPermitted;
    }

    @Override
    public void onError(long durationInNanos, Throwable throwable) {
        if (circuitBreakerConfig.getRecordFailurePredicate().test(throwable)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("CircuitBreaker '%s' recorded a failure:", name), throwable);
            }
            publishCircuitErrorEvent(name, durationInNanos, throwable);
            stateReference.get().onError(throwable);
        } else {
            publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);
        }
    }

    @Override
    public void onSuccess(long durationInNanos) {
        publishSuccessEvent(durationInNanos);
        stateReference.get().onSuccess();
    }

    /**
     * Get the state of this CircuitBreaker.
     *
     * @return the the state of this CircuitBreaker
     */
    @Override
    public State getState() {
        return this.stateReference.get().getState();
    }

    /**
     * Get the name of this CircuitBreaker.
     *
     * @return the the name of this CircuitBreaker
     */
    @Override
    public String getName() {
        return this.name;
    }

    /**
     * Get the config of this CircuitBreaker.
     *
     * @return the config of this CircuitBreaker
     */
    @Override
    public CircuitBreakerConfig getCircuitBreakerConfig() {
        return circuitBreakerConfig;
    }

    @Override
    public Metrics getMetrics() {
        return this.stateReference.get().getMetrics();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        return String.format("CircuitBreaker '%s'", this.name);
    }

    @Override
    public void reset() {
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> new ClosedState(this));
        if (previousState.getState() != CLOSED) {
            publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), CLOSED));
        }
        publishResetEvent();
    }

    private void stateTransition(State newState, Function<CircuitBreakerState, CircuitBreakerState> newStateGenerator) {
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
            if (currentState.getState() == newState) {
                return currentState;
            }
            return newStateGenerator.apply(currentState);
        });
        if (previousState.getState() != newState) {
            publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));
        }
    }

    @Override
    public void transitionToDisabledState() {
        stateTransition(DISABLED, currentState -> new DisabledState(this));
    }

    @Override
    public void transitionToForcedOpenState() {
        stateTransition(FORCED_OPEN, currentState -> new ForcedOpenState(this));
    }

    @Override
    public void transitionToClosedState() {
        stateTransition(CLOSED, currentState -> new ClosedState(this, currentState.getMetrics()));
    }

    @Override
    public void transitionToOpenState() {
        stateTransition(OPEN, currentState -> new OpenState(this, currentState.getMetrics()));
    }

    @Override
    public void transitionToHalfOpenState() {
        stateTransition(HALF_OPEN, currentState -> new HalfOpenState(this));
    }
    //......
}
  • CircuitBreakerStateMachine实现了CircuitBreakerStateMachine接口
  • AtomicReference用来记录当前断路器的状态
  • 状态转换接口内部都调用了stateTransition方法,里头主要是更新AtomicReference以及发布事件
  • stateTransition以及reset方法里头都调用了StateTransition.transitionBetween来发布事件
  • CircuitBreakerEventProcessor用来处理事件

CircuitBreakerState

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.java

/**
 * Abstract state of the CircuitBreaker state machine.
 */
abstract class CircuitBreakerState{

    CircuitBreakerStateMachine stateMachine;

    CircuitBreakerState(CircuitBreakerStateMachine stateMachine) {
        this.stateMachine = stateMachine;
    }

    abstract boolean isCallPermitted();

    abstract void onError(Throwable throwable);

    abstract void onSuccess();

    abstract CircuitBreaker.State getState();

    abstract CircuitBreakerMetrics getMetrics();

    /**
     * Should the CircuitBreaker in this state publish events
     * @return a boolean signaling if the events should be published
     */
    boolean shouldPublishEvents(CircuitBreakerEvent event){
        return event.getEventType().forcePublish || getState().allowPublish;
    }
}
  • CircuitBreakerState与CircuitBreakerStateMachine二者相互保存各自的引用
  • CircuitBreakerState的子类有OpenState、HalfOpenState、ForcedOpenState、ClosedState、DisabledState
  • 每个子类的onError、onSuccess方法自己判断是否进行状态转移,如果要转移会调用CircuitBreakerStateMachine的transitionTo开头的方法

StateTransition.transitionBetween

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.java

    /**
     * State transitions of the CircuitBreaker state machine.
     */
    enum StateTransition {
        CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
        CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
        CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
        HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
        HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
        HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
        HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
        OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
        OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
        OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
        OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
        FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),
        FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),
        FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),
        FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),
        DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),
        DISABLED_TO_OPEN(State.DISABLED, State.OPEN),
        DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),
        DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);

        private final State fromState;

        private final State toState;

        private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP =
                Arrays
                        .stream(StateTransition.values())
                        .collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));

        private boolean matches(State fromState, State toState) {
            return this.fromState == fromState && this.toState == toState;
        }

        public static StateTransition transitionBetween(State fromState, State toState){
            final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));
            if(stateTransition == null) {
                throw new IllegalStateException(
                        String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));
            }
            return stateTransition;
        }

        StateTransition(State fromState, State toState) {
            this.fromState = fromState;
            this.toState = toState;
        }

        public State getFromState() {
            return fromState;
        }

        public State getToState() {
            return toState;
        }

        @Override
        public String toString(){
            return String.format("State transition from %s to %s", fromState, toState);
        }
    }
  • StateTransition定义了一系列状态转换的路径
  • StateTransition.transitionBetween方法会对这些状态转换进行校验,不合法的抛出IllegalStateException

CircuitBreakerEventProcessor

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java

    private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
        @Override
        public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
            registerConsumer(CircuitBreakerOnSuccessEvent.class, onSuccessEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> onErrorEventConsumer) {
            registerConsumer(CircuitBreakerOnErrorEvent.class, onErrorEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> onStateTransitionEventConsumer) {
            registerConsumer(CircuitBreakerOnStateTransitionEvent.class, onStateTransitionEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> onResetEventConsumer) {
            registerConsumer(CircuitBreakerOnResetEvent.class, onResetEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> onIgnoredErrorEventConsumer) {
            registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class, onIgnoredErrorEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> onCallNotPermittedEventConsumer) {
            registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class, onCallNotPermittedEventConsumer);
            return this;
        }

        @Override
        public void consumeEvent(CircuitBreakerEvent event) {
            super.processEvent(event);
        }
    }
  • CircuitBreakerEventProcessor继承了EventProcessor,处理CircuitBreakerEvent事件
  • CircuitBreakerEventProcessor也实现了EventConsumer以及EventPublisher接口
  • onSuccess、onError、onStateTransition、onReset、onIgnoredError、onCallNotPermitted里头调用了registerConsumer方法
  • consumeEvent里头则是调用了processEvent方法

EventProcessor

resilience4j-core-0.13.0-sources.jar!/io/github/resilience4j/core/EventProcessor.java

public class EventProcessor<T> implements EventPublisher<T> {

    protected volatile boolean consumerRegistered;
    private volatile EventConsumer<T> onEventConsumer;
    private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();

    public boolean hasConsumers(){
        return consumerRegistered;
    }

    @SuppressWarnings("unchecked")
    public <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
        consumerRegistered = true;
        eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);
    }

    @SuppressWarnings("unchecked")
    public <E extends T> boolean processEvent(E event) {
        boolean consumed = false;
        if(onEventConsumer != null){
            onEventConsumer.consumeEvent(event);
            consumed = true;
        }
        if(!eventConsumers.isEmpty()){
            EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());
            if(eventConsumer != null){
                eventConsumer.consumeEvent(event);
                consumed = true;
            }
        }
        return consumed;
    }

    @Override
    public void onEvent(EventConsumer<T> onEventConsumer) {
        consumerRegistered = true;
        this.onEventConsumer = onEventConsumer;
    }
}
  • registerConsumer方法主要是往eventConsumers设置事件类型的消费者
  • processEvent方法主要是查找相应的事件消费者去处理事件

EventPublisher

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.java

    /**
     * An EventPublisher can be used to register event consumers.
     */
    interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {

        EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);

        EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);

        EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);

        EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);

        EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);

        EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
        }
  • 这个EventPublisher接口继承了io.github.resilience4j.core.EventPublisher,逻辑上有点混乱
  • 这些on方法分别处理了CircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent事件

CircuitBreakerEvent

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/CircuitBreakerEvent.java

/**
 * An event which is created by a CircuitBreaker.
 */
public interface CircuitBreakerEvent {

    /**
     * Returns the name of the CircuitBreaker which has created the event.
     *
     * @return the name of the CircuitBreaker which has created the event
     */
    String getCircuitBreakerName();

    /**
     * Returns the type of the CircuitBreaker event.
     *
     * @return the type of the CircuitBreaker event
     */
    Type getEventType();

    /**
     * Returns the creation time of CircuitBreaker event.
     *
     * @return the creation time of CircuitBreaker event
     */
    ZonedDateTime getCreationTime();

    /**
     * Event types which are created by a CircuitBreaker.
     */
    enum Type {
        /** A CircuitBreakerEvent which informs that an error has been recorded */
        ERROR(false),
        /** A CircuitBreakerEvent which informs that an error has been ignored */
        IGNORED_ERROR(false),
        /** A CircuitBreakerEvent which informs that a success has been recorded */
        SUCCESS(false),
        /** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */
        NOT_PERMITTED(false),
        /** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */
        STATE_TRANSITION(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */
        RESET(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */
        FORCED_OPEN(false),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */
        DISABLED(false);

        public final boolean forcePublish;

        Type(boolean forcePublish) {
            this.forcePublish = forcePublish;
        }
    }
}
  • CircuitBreakerEvent接口定义了事件的Type枚举
  • 规范了getCircuitBreakerName、getEventType、getCreationTime方法

AbstractCircuitBreakerEvent

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/AbstractCircuitBreakerEvent.java

abstract class AbstractCircuitBreakerEvent implements CircuitBreakerEvent {

    private final String circuitBreakerName;
    private final ZonedDateTime creationTime;

    AbstractCircuitBreakerEvent(String circuitBreakerName) {
        this.circuitBreakerName = circuitBreakerName;
        this.creationTime = ZonedDateTime.now();
    }

    @Override
    public String getCircuitBreakerName() {
        return circuitBreakerName;
    }

    @Override
    public ZonedDateTime getCreationTime() {
        return creationTime;
    }
}
  • 定义了circuitBreakerName、creationTime属性
  • 重写了getCircuitBreakerName、getCreationTime方法
  • ircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent都是从AbstractCircuitBreakerEvent继承而来,个别的自定义了自己的属性,主要是重写getEventType以及toString方法

小结

  • CircuitBreakerStateMachine里头维护了一个AtomicReference引用,对应的onError及onSuccess方法都委托给改引用对应的状态的onError以及onSuccess方法
  • 子类的onError以及onSuccess方法方法则自行判断是否需要进行状态切换以及切换到什么状态,自己调用CircuitBreakerStateMachine的transitionTo开头的方法,来改变AtomicReference的值(借助子类的各自实现来化解状态转换的复杂逻辑),同时发布一些事件。

doc

  • Resilience4j is a fault tolerance library designed for Java8 and functional programming

本文分享自微信公众号 - 码匠的流水账(geek_luandun)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-07-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

扫码关注云+社区

领取腾讯云代金券