前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊reactor-logback的AsyncAppender

聊聊reactor-logback的AsyncAppender

原创
作者头像
code4it
发布2023-12-18 09:36:38
1660
发布2023-12-18 09:36:38
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下reactor-logback的AsyncAppender

AsyncAppender

reactor-logback/src/main/java/reactor/logback/AsyncAppender.java

代码语言:javascript
复制
public class AsyncAppender extends ContextAwareBase
		implements Appender<ILoggingEvent>, AppenderAttachable<ILoggingEvent>,
		           CoreSubscriber<ILoggingEvent> {

	private final AppenderAttachableImpl<ILoggingEvent>    aai      =
			new AppenderAttachableImpl<ILoggingEvent>();
	private final FilterAttachableImpl<ILoggingEvent>      fai      =
			new FilterAttachableImpl<ILoggingEvent>();
	private final AtomicReference<Appender<ILoggingEvent>> delegate =
			new AtomicReference<Appender<ILoggingEvent>>();

	private String                            name;
	private WorkQueueProcessor<ILoggingEvent> processor;

	private int     backlog           = 1024 * 1024;
	private boolean includeCallerData = false;
	private boolean started           = false;

	//......
}	

AsyncAppender继承了ContextAwareBase,同时实现了Appender、AppenderAttachable、CoreSubscriber接口

CoreSubscriber

reactor/core/CoreSubscriber.java

代码语言:javascript
复制
public interface CoreSubscriber<T> extends Subscriber<T> {

	/**
	 * Request a {@link Context} from dependent components which can include downstream
	 * operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
	 *
	 * @return a resolved context or {@link Context#empty()}
	 */
	default Context currentContext(){
		return Context.empty();
	}

	/**
	 * Implementors should initialize any state used by {@link #onNext(Object)} before
	 * calling {@link Subscription#request(long)}. Should further {@code onNext} related
	 * state modification occur, thread-safety will be required.
	 * <p>
	 *    Note that an invalid request {@code <= 0} will not produce an onError and
	 *    will simply be ignored or reported through a debug-enabled
	 *    {@link reactor.util.Logger}.
	 *
	 * {@inheritDoc}
	 */
	@Override
	void onSubscribe(Subscription s);
}

CoreSubscriber继承了Subscriber接口,Subscriber接口定义了onSubscribe(Subscription s)、onNext、onError、onComplete方法

onSubscribe

代码语言:javascript
复制
	public void onSubscribe(Subscription s) {
		try {
			doStart();
		}
		catch (Throwable t) {
			addError(t.getMessage(), t);
		}
		finally {
			started = true;
			s.request(Long.MAX_VALUE);
		}
	}

	protected void doStart() {
	}	

onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE)

onNext

代码语言:javascript
复制
	public void onNext(ILoggingEvent iLoggingEvent) {
		aai.appendLoopOnAppenders(iLoggingEvent);
	}

onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法

onError

代码语言:javascript
复制
	public void onError(Throwable t) {
		addError(t.getMessage(), t);
	}

onError主要是添加错误信息到logback的status

onComplete

代码语言:javascript
复制
	public void onComplete() {
		try {
			Appender<ILoggingEvent> appender = delegate.getAndSet(null);
			if (appender != null){
				doStop();
				appender.stop();
				aai.detachAndStopAllAppenders();
			}
		}
		catch (Throwable t) {
			addError(t.getMessage(), t);
		}
		finally {
			started = false;
		}
	}

	protected void doStop() {
	}	

onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false

Appender.doAppend

代码语言:javascript
复制
	public void doAppend(ILoggingEvent evt) throws LogbackException {
		if (getFilterChainDecision(evt) == FilterReply.DENY) {
			return;
		}
		evt.prepareForDeferredProcessing();
		if (includeCallerData) {
			evt.getCallerData();
		}
		try {
			queueLoggingEvent(evt);
		}
		catch (Throwable t) {
			addError(t.getMessage(), t);
		}
	}

	protected void queueLoggingEvent(ILoggingEvent evt) {
		if (null != delegate.get()) {
			processor.onNext(evt);
		}
	}	

doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)

LifeCycle.start

代码语言:javascript
复制
	public void start() {
		startDelegateAppender();

		processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger")
		                                                       .bufferSize(backlog)
		                                                       .autoCancel(false)
		                                                       .build();
		processor.subscribe(this);
	}

	private void startDelegateAppender() {
		Appender<ILoggingEvent> delegateAppender = delegate.get();
		if (null != delegateAppender && !delegateAppender.isStarted()) {
			delegateAppender.start();
		}
	}

	public void addAppender(Appender<ILoggingEvent> newAppender) {
		if (delegate.compareAndSet(null, newAppender)) {
			aai.addAppender(newAppender);
		}
		else {
			throw new IllegalArgumentException(delegate.get() + " already attached.");
		}
	}		

start方法执行startDelegateAppender,然后创建WorkQueueProcessor(默认bufferSize为1024 * 1024),并subscribe当前实例;addAppender方法会设置delegate,并往AppenderAttachableImpl添加appender

stop

代码语言:javascript
复制
	public void stop() {
		processor.onComplete();
	}

stop方法执行processor.onComplete()

小结

reactor-logback基于WorkQueueProcessor提供了另外一种AsyncAppender,它不是基于BlockingQueue而是基于RingBuffer来实现的。其onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE);onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法;onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false;doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AsyncAppender
  • CoreSubscriber
    • onSubscribe
      • onNext
        • onError
          • onComplete
          • Appender.doAppend
          • LifeCycle.start
            • stop
            • 小结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档