首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

EventBus源码分析之发布流程

继上篇文章EventBus源码分析之订阅流程之后,继续介绍EventBus的发布,事件发送完,EventBus如何做到调用之前注册的方法。

发布者发布事件,事件如何到订阅方法的

其实看完上面的代码,应该有个大体思路了,东西都保存在了EventBus中,发布者发完事件,EventBus根据事件去找到所有订阅方法,然后反射调用就OK了,下面我们将实践看一下,是不是这么一个步骤。

EventBus.post()

一切从发布者的post()方法说起,源码如下:

代码语言:javascript
复制
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

    public void post(Object event) {
        //每个线程创建一个PostingThreadState,将事件加入队列
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);


        if (!postingState.isPosting) {
            //是否在主线程发步的
            postingState.isMainThread = isMainThread();
            //置标志位
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //将队列中所有事件逐个发布出去
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

上面涉及ThreadLocal,不了解的朋友可以参考ThreadLocal源码分析。PostThreadState会为每个线程创建一份,然后共用。同一个线程中发布的事件都会存到它的List中。 postSingleEvent()就是发送单个事件的方法,

代码语言:javascript
复制
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //case:事件继承,默认false,需要找出该事件的继承结构
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        //如果没有找到订阅方法消费
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

从上面可以看到,核心代码是调用了postSingleEventForEventType()方法,该方法的代码如下:

代码语言:javascript
复制
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        //加锁,因为注册线程会向该Map中插入数据
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        //如果有该事件类型的订阅元组
        if (subscriptions != null && !subscriptions.isEmpty()) {
            //遍历元组,由于存放的时候是按优先级的,所以这里也按优先级进行消费
            for (Subscription subscription : subscriptions) {
                //置状态
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //发布到单个订阅元组
                    postToSubscription(subscription, event, postingState.isMainThread);
                    //是否被中途取消了
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                //如果中断了,那么退出
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

可以看到最终调用了postToSubscription()方法将事件发布给订阅者,该方法在上面已经碰到了,最终是通过反射进行调用的;这样依次将每一个事件完成了发布。

取消事件分发

上面涉及到一个参数,PostingThreadState的canceled参数,该参数会在取消事件时标志。取消事件分发是在事件消费方法中调用cancelEventDelivery()方法,该方法的限制场景是ThreadMode是POSTING的情景下,下面会说明原因。该方法的代码如下:

代码语言:javascript
复制
public void cancelEventDelivery(Object event) {
        //获取当前线程的PostingThreadState
        PostingThreadState postingState = currentPostingThreadState.get();
        //情况判断,当前不在分发事件,抛出异常
        if (!postingState.isPosting) {
            throw new EventBusException(
                    "This method may only be called from inside event handling methods on the posting thread");
        } else if (event == null) {
            throw new EventBusException("Event may not be null");
        } else if (postingState.event != event) {
            throw new EventBusException("Only the currently handled event may be aborted");
        } else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
            throw new EventBusException(" event handlers may only abort the incoming event");
        }

        //设置标志位,后续事件将在分发中终止
        postingState.canceled = true;
    }

上面说过,每个线程有一个PostingThreadState,而分发方法和订阅方法都会获取PostingThreadState进行标志位的设置来达到消息通信的,这要求必须得是同一个对象,也就是说发布和订阅是在同一个线程中,而ThreadMode为POSTING的情况下,发布和事件消费是在同一个线程中,这儿也能想象该类为啥叫PostingThreadState了。

总结

经过上面的源码分析,可以理解事件中心是如何保存订阅者的,订阅者为啥只需调用register()方法,其他就可以什么都不管了,因此事件中心会利用反射找出@Subscribe注解了的方法,然后保存起来;发布者为啥只要post()出事件,剩下的就不要管了,因为事件中心会去寻找出之前保存的订阅者以及订阅方法,然后通过反射进行调用。

下一篇
举报
领券