前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊resilience4j的CircuitBreakerStateMachine

聊聊resilience4j的CircuitBreakerStateMachine

作者头像
code4it
发布2018-09-17 17:04:01
6240
发布2018-09-17 17:04:01
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下resilience4j的CircuitBreakerStateMachine

CircuitBreakerStateMachine

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java

代码语言:javascript
复制
/**
 * A CircuitBreaker finite state machine.
 */
public final class CircuitBreakerStateMachine implements CircuitBreaker {

    private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakerStateMachine.class);

    private final String name;
    private final AtomicReference<CircuitBreakerState> stateReference;
    private final CircuitBreakerConfig circuitBreakerConfig;
    private final CircuitBreakerEventProcessor eventProcessor;

    /**
     * Creates a circuitBreaker.
     *
     * @param name                 the name of the CircuitBreaker
     * @param circuitBreakerConfig The CircuitBreaker configuration.
     */
    public CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig) {
        this.name = name;
        this.circuitBreakerConfig = circuitBreakerConfig;
        this.stateReference = new AtomicReference<>(new ClosedState(this));
        this.eventProcessor = new CircuitBreakerEventProcessor();
    }

    /**
     * Creates a circuitBreaker with default config.
     *
     * @param name the name of the CircuitBreaker
     */
    public CircuitBreakerStateMachine(String name) {
        this(name, CircuitBreakerConfig.ofDefaults());
    }

    /**
     * Creates a circuitBreaker.
     *
     * @param name                 the name of the CircuitBreaker
     * @param circuitBreakerConfig The CircuitBreaker configuration supplier.
     */
    public CircuitBreakerStateMachine(String name, Supplier<CircuitBreakerConfig> circuitBreakerConfig) {
        this(name, circuitBreakerConfig.get());
    }

    /**
     * Requests permission to call this backend.
     *
     * @return true, if the call is allowed.
     */
    @Override
    public boolean isCallPermitted() {
        boolean callPermitted = stateReference.get().isCallPermitted();
        if (!callPermitted) {
            publishCallNotPermittedEvent();
        }
        return callPermitted;
    }

    @Override
    public void onError(long durationInNanos, Throwable throwable) {
        if (circuitBreakerConfig.getRecordFailurePredicate().test(throwable)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("CircuitBreaker '%s' recorded a failure:", name), throwable);
            }
            publishCircuitErrorEvent(name, durationInNanos, throwable);
            stateReference.get().onError(throwable);
        } else {
            publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);
        }
    }

    @Override
    public void onSuccess(long durationInNanos) {
        publishSuccessEvent(durationInNanos);
        stateReference.get().onSuccess();
    }

    /**
     * Get the state of this CircuitBreaker.
     *
     * @return the the state of this CircuitBreaker
     */
    @Override
    public State getState() {
        return this.stateReference.get().getState();
    }

    /**
     * Get the name of this CircuitBreaker.
     *
     * @return the the name of this CircuitBreaker
     */
    @Override
    public String getName() {
        return this.name;
    }

    /**
     * Get the config of this CircuitBreaker.
     *
     * @return the config of this CircuitBreaker
     */
    @Override
    public CircuitBreakerConfig getCircuitBreakerConfig() {
        return circuitBreakerConfig;
    }

    @Override
    public Metrics getMetrics() {
        return this.stateReference.get().getMetrics();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        return String.format("CircuitBreaker '%s'", this.name);
    }

    @Override
    public void reset() {
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> new ClosedState(this));
        if (previousState.getState() != CLOSED) {
            publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), CLOSED));
        }
        publishResetEvent();
    }

    private void stateTransition(State newState, Function<CircuitBreakerState, CircuitBreakerState> newStateGenerator) {
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
            if (currentState.getState() == newState) {
                return currentState;
            }
            return newStateGenerator.apply(currentState);
        });
        if (previousState.getState() != newState) {
            publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));
        }
    }

    @Override
    public void transitionToDisabledState() {
        stateTransition(DISABLED, currentState -> new DisabledState(this));
    }

    @Override
    public void transitionToForcedOpenState() {
        stateTransition(FORCED_OPEN, currentState -> new ForcedOpenState(this));
    }

    @Override
    public void transitionToClosedState() {
        stateTransition(CLOSED, currentState -> new ClosedState(this, currentState.getMetrics()));
    }

    @Override
    public void transitionToOpenState() {
        stateTransition(OPEN, currentState -> new OpenState(this, currentState.getMetrics()));
    }

    @Override
    public void transitionToHalfOpenState() {
        stateTransition(HALF_OPEN, currentState -> new HalfOpenState(this));
    }
    //......
}
  • CircuitBreakerStateMachine实现了CircuitBreakerStateMachine接口
  • AtomicReference用来记录当前断路器的状态
  • 状态转换接口内部都调用了stateTransition方法,里头主要是更新AtomicReference以及发布事件
  • stateTransition以及reset方法里头都调用了StateTransition.transitionBetween来发布事件
  • CircuitBreakerEventProcessor用来处理事件

CircuitBreakerState

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.java

代码语言:javascript
复制
/**
 * Abstract state of the CircuitBreaker state machine.
 */
abstract class CircuitBreakerState{

    CircuitBreakerStateMachine stateMachine;

    CircuitBreakerState(CircuitBreakerStateMachine stateMachine) {
        this.stateMachine = stateMachine;
    }

    abstract boolean isCallPermitted();

    abstract void onError(Throwable throwable);

    abstract void onSuccess();

    abstract CircuitBreaker.State getState();

    abstract CircuitBreakerMetrics getMetrics();

    /**
     * Should the CircuitBreaker in this state publish events
     * @return a boolean signaling if the events should be published
     */
    boolean shouldPublishEvents(CircuitBreakerEvent event){
        return event.getEventType().forcePublish || getState().allowPublish;
    }
}
  • CircuitBreakerState与CircuitBreakerStateMachine二者相互保存各自的引用
  • CircuitBreakerState的子类有OpenState、HalfOpenState、ForcedOpenState、ClosedState、DisabledState
  • 每个子类的onError、onSuccess方法自己判断是否进行状态转移,如果要转移会调用CircuitBreakerStateMachine的transitionTo开头的方法

StateTransition.transitionBetween

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.java

代码语言:javascript
复制
    /**
     * State transitions of the CircuitBreaker state machine.
     */
    enum StateTransition {
        CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
        CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
        CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
        HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
        HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
        HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
        HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
        OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
        OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
        OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
        OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
        FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),
        FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),
        FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),
        FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),
        DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),
        DISABLED_TO_OPEN(State.DISABLED, State.OPEN),
        DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),
        DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);

        private final State fromState;

        private final State toState;

        private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP =
                Arrays
                        .stream(StateTransition.values())
                        .collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));

        private boolean matches(State fromState, State toState) {
            return this.fromState == fromState && this.toState == toState;
        }

        public static StateTransition transitionBetween(State fromState, State toState){
            final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));
            if(stateTransition == null) {
                throw new IllegalStateException(
                        String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));
            }
            return stateTransition;
        }

        StateTransition(State fromState, State toState) {
            this.fromState = fromState;
            this.toState = toState;
        }

        public State getFromState() {
            return fromState;
        }

        public State getToState() {
            return toState;
        }

        @Override
        public String toString(){
            return String.format("State transition from %s to %s", fromState, toState);
        }
    }
  • StateTransition定义了一系列状态转换的路径
  • StateTransition.transitionBetween方法会对这些状态转换进行校验,不合法的抛出IllegalStateException

CircuitBreakerEventProcessor

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java

代码语言:javascript
复制
    private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
        @Override
        public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
            registerConsumer(CircuitBreakerOnSuccessEvent.class, onSuccessEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> onErrorEventConsumer) {
            registerConsumer(CircuitBreakerOnErrorEvent.class, onErrorEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> onStateTransitionEventConsumer) {
            registerConsumer(CircuitBreakerOnStateTransitionEvent.class, onStateTransitionEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> onResetEventConsumer) {
            registerConsumer(CircuitBreakerOnResetEvent.class, onResetEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> onIgnoredErrorEventConsumer) {
            registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class, onIgnoredErrorEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> onCallNotPermittedEventConsumer) {
            registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class, onCallNotPermittedEventConsumer);
            return this;
        }

        @Override
        public void consumeEvent(CircuitBreakerEvent event) {
            super.processEvent(event);
        }
    }
  • CircuitBreakerEventProcessor继承了EventProcessor,处理CircuitBreakerEvent事件
  • CircuitBreakerEventProcessor也实现了EventConsumer以及EventPublisher接口
  • onSuccess、onError、onStateTransition、onReset、onIgnoredError、onCallNotPermitted里头调用了registerConsumer方法
  • consumeEvent里头则是调用了processEvent方法

EventProcessor

resilience4j-core-0.13.0-sources.jar!/io/github/resilience4j/core/EventProcessor.java

代码语言:javascript
复制
public class EventProcessor<T> implements EventPublisher<T> {

    protected volatile boolean consumerRegistered;
    private volatile EventConsumer<T> onEventConsumer;
    private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();

    public boolean hasConsumers(){
        return consumerRegistered;
    }

    @SuppressWarnings("unchecked")
    public <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
        consumerRegistered = true;
        eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);
    }

    @SuppressWarnings("unchecked")
    public <E extends T> boolean processEvent(E event) {
        boolean consumed = false;
        if(onEventConsumer != null){
            onEventConsumer.consumeEvent(event);
            consumed = true;
        }
        if(!eventConsumers.isEmpty()){
            EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());
            if(eventConsumer != null){
                eventConsumer.consumeEvent(event);
                consumed = true;
            }
        }
        return consumed;
    }

    @Override
    public void onEvent(EventConsumer<T> onEventConsumer) {
        consumerRegistered = true;
        this.onEventConsumer = onEventConsumer;
    }
}
  • registerConsumer方法主要是往eventConsumers设置事件类型的消费者
  • processEvent方法主要是查找相应的事件消费者去处理事件

EventPublisher

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.java

代码语言:javascript
复制
    /**
     * An EventPublisher can be used to register event consumers.
     */
    interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {

        EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);

        EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);

        EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);

        EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);

        EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);

        EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
        }
  • 这个EventPublisher接口继承了io.github.resilience4j.core.EventPublisher,逻辑上有点混乱
  • 这些on方法分别处理了CircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent事件

CircuitBreakerEvent

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/CircuitBreakerEvent.java

代码语言:javascript
复制
/**
 * An event which is created by a CircuitBreaker.
 */
public interface CircuitBreakerEvent {

    /**
     * Returns the name of the CircuitBreaker which has created the event.
     *
     * @return the name of the CircuitBreaker which has created the event
     */
    String getCircuitBreakerName();

    /**
     * Returns the type of the CircuitBreaker event.
     *
     * @return the type of the CircuitBreaker event
     */
    Type getEventType();

    /**
     * Returns the creation time of CircuitBreaker event.
     *
     * @return the creation time of CircuitBreaker event
     */
    ZonedDateTime getCreationTime();

    /**
     * Event types which are created by a CircuitBreaker.
     */
    enum Type {
        /** A CircuitBreakerEvent which informs that an error has been recorded */
        ERROR(false),
        /** A CircuitBreakerEvent which informs that an error has been ignored */
        IGNORED_ERROR(false),
        /** A CircuitBreakerEvent which informs that a success has been recorded */
        SUCCESS(false),
        /** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */
        NOT_PERMITTED(false),
        /** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */
        STATE_TRANSITION(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */
        RESET(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */
        FORCED_OPEN(false),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */
        DISABLED(false);

        public final boolean forcePublish;

        Type(boolean forcePublish) {
            this.forcePublish = forcePublish;
        }
    }
}
  • CircuitBreakerEvent接口定义了事件的Type枚举
  • 规范了getCircuitBreakerName、getEventType、getCreationTime方法

AbstractCircuitBreakerEvent

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/AbstractCircuitBreakerEvent.java

代码语言:javascript
复制
abstract class AbstractCircuitBreakerEvent implements CircuitBreakerEvent {

    private final String circuitBreakerName;
    private final ZonedDateTime creationTime;

    AbstractCircuitBreakerEvent(String circuitBreakerName) {
        this.circuitBreakerName = circuitBreakerName;
        this.creationTime = ZonedDateTime.now();
    }

    @Override
    public String getCircuitBreakerName() {
        return circuitBreakerName;
    }

    @Override
    public ZonedDateTime getCreationTime() {
        return creationTime;
    }
}
  • 定义了circuitBreakerName、creationTime属性
  • 重写了getCircuitBreakerName、getCreationTime方法
  • ircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent都是从AbstractCircuitBreakerEvent继承而来,个别的自定义了自己的属性,主要是重写getEventType以及toString方法

小结

  • CircuitBreakerStateMachine里头维护了一个AtomicReference引用,对应的onError及onSuccess方法都委托给改引用对应的状态的onError以及onSuccess方法
  • 子类的onError以及onSuccess方法方法则自行判断是否需要进行状态切换以及切换到什么状态,自己调用CircuitBreakerStateMachine的transitionTo开头的方法,来改变AtomicReference的值(借助子类的各自实现来化解状态转换的复杂逻辑),同时发布一些事件。

doc

  • Resilience4j is a fault tolerance library designed for Java8 and functional programming
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CircuitBreakerStateMachine
  • CircuitBreakerState
  • StateTransition.transitionBetween
  • CircuitBreakerEventProcessor
    • EventProcessor
      • EventPublisher
        • CircuitBreakerEvent
          • AbstractCircuitBreakerEvent
          • 小结
          • doc
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档