专栏首页luozhiyun的技术学习8.源码分析---从设计模式中看SOFARPC中的EventBus?

8.源码分析---从设计模式中看SOFARPC中的EventBus?

我们在前面分析客户端引用的时候会看到如下这段代码:

// 产生开始调用事件
if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
    EventBus.post(new ClientStartInvokeEvent(request));
}

这里用EventBus调用了一下post方法之后就什么也没做了,就方法名来看是发送了一个post请求,也不知道发给谁,到底有什么用。

所以这一节我们来分析一下EventBus这个类的作用。

首先我们来看一下这个类的方法

从EventBus的方法中我们是不是应该想到了这是使用了什么设计模式?

没错,这里用到的是订阅发布模式(Subscribe/Publish)。订阅发布模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。

我们先分析源码,分析完源码之后再来总结一下。

EventBus发送事件

根据上面的示例,我们先看EventBus#post是里面是怎么做的。 EventBus#post

private final static ConcurrentMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP = new ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>>();

public static void post(final Event event) {
    //是否开启总线
    if (!isEnable()) {
        return;
    }
    //根据传入得event获取到相应的Subscriber
    CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());
    if (CommonUtils.isNotEmpty(subscribers)) {
        for (final Subscriber subscriber : subscribers) {
            //如果事件订阅者是同步的,那么直接调用
            if (subscriber.isSync()) {
                handleEvent(subscriber, event);
            } else { // 异步
                final RpcInternalContext context = RpcInternalContext.peekContext();
                //使用线程池启动一个线程一部执行任务
                final ThreadPoolExecutor asyncThreadPool = AsyncRuntime.getAsyncThreadPool();
                try {
                    asyncThreadPool.execute(
                            new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        RpcInternalContext.setContext(context);
                                        //调用订阅者的event事件
                                        handleEvent(subscriber, event);
                                    } finally {
                                        RpcInternalContext.removeContext();
                                    }
                                }
                            });
                } catch (RejectedExecutionException e) {
                    LOGGER
                            .warn("This queue is full when post event to async execute, queue size is " +
                                    asyncThreadPool.getQueue().size() +
                                    ", please optimize this async thread pool of eventbus.");
                }
            }
        }
    }
}

private static void handleEvent(final Subscriber subscriber, final Event event) {
    try {
        subscriber.onEvent(event);
    } catch (Throwable e) {
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn("Handle " + event.getClass() + " error", e);
        }
    }
}

这个post方法主要做了这么几件事:

  1. 根据传入的Event获取对应的订阅列表subscribers
  2. 遍历subscribers
  3. 如果订阅者是异步的,那么就使用线程池启动执行任务 4, 如果是同步的那么就调用handleEvent方法向订阅者发布消息

我们再来看看订阅者是怎样的:

Subscriber

public abstract class Subscriber {
    /**
     * 接到事件是否同步执行
     */
    protected boolean sync = true;

    /**
     * 事件订阅者
     */
    protected Subscriber() {
    }

    /**
     * 事件订阅者
     *
     * @param sync 是否同步
     */
    protected Subscriber(boolean sync) {
        this.sync = sync;
    }

    /**
     * 是否同步
     *
     * @return 是否同步
     */
    public boolean isSync() {
        return sync;
    }

    /**
     * 事件处理,请处理异常
     *
     * @param event 事件
     */
    public abstract void onEvent(Event event);

}

Subscriber是一个抽象类,默认是同步的方式进行订阅。总共有下面四个实现类: LookoutSubscriber FaultToleranceSubscriber RestTracerSubscriber SofaTracerSubscriber

这里我不打算每个都进行分析,到时候打算用到了再详细说明,这样不会那么抽象。

由于我们前面讲到了,在客户端引用的时候会发送一个产生开始调用事件给总线,那一定要有订阅者这个发送事件才有意义。所以我们接下来看看是在哪里进行事件的注册的。

订阅者注册到EventBus

通过上面的继承关系图可以看到,在ConsumerConfig是AbstractIdConfig的子类,所以在初始化ConsumerConfig的时候AbstractIdConfig静态代码块也会被初始化。

public abstract class AbstractIdConfig<S extends AbstractIdConfig> implements Serializable {

    static {
        RpcRuntimeContext.now();
    }
}

在调用RpcRuntimeContext#now方法的时候,会调用到RpcRuntimeContext的静态代码块

RpcRuntimeContext

public class RpcRuntimeContext {

    static {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID);
        }
        put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);
        // 初始化一些上下文
        initContext();
        // 初始化其它模块
        ModuleFactory.installModules();
        // 增加jvm关闭事件
        if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                @Override
                public void run() {
                    if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now.");
                    }
                    destroy(false);
                }
            }, "SOFA-RPC-ShutdownHook"));
        }
    }

    public static long now() {
        return System.currentTimeMillis();
    }
}

在RpcRuntimeContext静态代码块里主要做了以下几件事:

  1. 初始化一些上下文的东西,例如:应用Id,应用名称,当前所在文件夹地址等
  2. 初始化一些模块,等下分析
  3. 增加jvm关闭时的钩子

我们直接看installModules方法就好了,其他的方法和主流程无关。

ModuleFactory#installModules

public static void installModules() {
    //通过SPI加载Module模块
    ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);
    //moduleLoadList 默认是 *
    String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);
    for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {
        String moduleName = o.getKey();
        Module module = o.getValue().getExtInstance();
        // judge need load from rpc option
        if (needLoad(moduleLoadList, moduleName)) {
            // judge need load from implement
            if (module.needLoad()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Install Module: {}", moduleName);
                }
                //安装模板
                module.install();
                INSTALLED_MODULES.put(moduleName, module);
            } else {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("The module " + moduleName + " does not need to be loaded.");
                }
            }
        } else {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("The module " + moduleName + " is not in the module load list.");
            }
        }
    }
}
  1. 这个方法里面一开始获取Module的扩展类,Module的扩展类有如下几个: FaultToleranceModule LookoutModule RestTracerModule SofaTracerModule
  2. 然后会去获取MODULE_LOAD_LIST配置类,多个配置用“;”分割。
  3. 调用loader.getAllExtensions()获取所有的扩展类。遍历扩展类。
  4. 接着调用needLoad方法:
static boolean needLoad(String moduleLoadList, String moduleName) {
    //用;拆分
    String[] activatedModules = StringUtils.splitWithCommaOrSemicolon(moduleLoadList);
    boolean match = false;
    for (String activatedModule : activatedModules) {
        //ALL 就是 *
        if (StringUtils.ALL.equals(activatedModule)) {
            match = true;
        } else if (activatedModule.equals(moduleName)) {
            match = true;
        } else if (match && (activatedModule.equals("!" + moduleName)
                || activatedModule.equals("-" + moduleName))) {
            match = false;
            break;
        }
    }
    return match;
}

这个方法会传入配置的moduleLoadList和当前遍历到的moduleName,moduleLoadList默认是*所以会返回true,如果配置了moduleLoadList不为*的话,如果moduleName是配置中的之一便会返回true。

  1. 调用module的install进行模板的装配

这里我们进入到SofaTracerModule#install中

SofaTracerModule#install

public void install() {
    Tracer tracer = TracerFactory.getTracer("sofaTracer");
    if (tracer != null) {
        subscriber = new SofaTracerSubscriber();
        EventBus.register(ClientStartInvokeEvent.class, subscriber);
        EventBus.register(ClientBeforeSendEvent.class, subscriber);
        EventBus.register(ClientAfterSendEvent.class, subscriber);
        EventBus.register(ServerReceiveEvent.class, subscriber);
        EventBus.register(ServerSendEvent.class, subscriber);
        EventBus.register(ServerEndHandleEvent.class, subscriber);
        EventBus.register(ClientSyncReceiveEvent.class, subscriber);
        EventBus.register(ClientAsyncReceiveEvent.class, subscriber);
        EventBus.register(ClientEndInvokeEvent.class, subscriber);
    }
}

这里我们可以看到文章一开始被发送的ClientStartInvokeEvent在这里被注册了。订阅者的实现类是SofaTracerSubscriber。

订阅者被调用

在上面我们分析到在注册到EventBus之后,会发送一个post请求,然后EventBus会遍历所有的Subscriber,调用符合条件的Subscriber的onEvent方法。

SofaTracerSubscriber#onEvent

public void onEvent(Event originEvent) {

   if (!Tracers.isEnable()) {
        return;
    }
    Class eventClass = originEvent.getClass();

    if (eventClass == ClientStartInvokeEvent.class) {
        ClientStartInvokeEvent event = (ClientStartInvokeEvent) originEvent;
        Tracers.startRpc(event.getRequest());
    }
    
    else if (eventClass == ClientBeforeSendEvent.class) {
            ClientBeforeSendEvent event = (ClientBeforeSendEvent) originEvent;
            Tracers.clientBeforeSend(event.getRequest());
    }
    .....
}

这个方法里面主要就是对不同的event做出不同的反应。ClientStartInvokeEvent所做的请求就是调用一下Tracers#startRpc,Tracers是用来做链路追踪的,这篇文章不涉及。

总结

我们首先上一张图,来说明一下订阅发布模式整体的结构。

在我们这个例子里EventBus的职责就是调度中心,subscriber的具体实现注册到EventBus中后,会保存到EventBus的SUBSCRIBER_MAP集合中。

发布者在发布消息的时候会调用EventBus的post方法传入一个具体的event来调用订阅者的事件。一个事件有多个订阅者,消息的发布者不会直接的去调用订阅者来发布消息,而是通过EventBus来进行触发。

通过EventBus来触发不同的订阅者的事件可以在触发事件之前同一的为其做一些操作,比如是同步还是异步,要不要过滤部分订阅者等。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 13.深入k8s:Pod 水平自动扩缩HPA及其源码分析

    Pod 水平自动扩缩全名是Horizontal Pod Autoscaler简称HPA。它可以基于 CPU 利用率或其他指标自动扩缩 ReplicationCo...

    luozhiyun
  • 1.Sentinel源码分析—FlowRuleManager加载规则做了什么?

    最近我很好奇在RPC中限流熔断降级要怎么做,hystrix已经1年多没有更新了,感觉要被遗弃的感觉,那么我就把眼光聚焦到了阿里的Sentinel,顺便学习一下阿...

    luozhiyun
  • 15.深入k8s:Event事件处理及其源码分析

    k8s的Event事件是一种资源对象,用于展示集群内发生的情况,k8s系统中的各个组件会将运行时发生的各种事件上报给apiserver 。可以通过kubectl...

    luozhiyun
  • 自适应个人主页html源码

    雨尘
  • TimeLimitingCollector源码解析

    在solr的查询请求中添加timeAllowed参数,可以限定solr查询的请求时间,在solr内部,是通过TimeLimitingCollector类...

    LuceneReader
  • 数据结构算法操作试题(C++/Python)——两两交换链表中的节点

    数据结构算法操作试题(C++/Python):数据结构算法操作试题(C++/Python)——目录

    莫斯
  • webpack代码分离 ensure 看了还不懂,你打我

    webpack ensure相信大家都听过。有人称它为异步加载,也有人说做代码切割,那这个家伙到底是用来干嘛的?其实说白了,它就是把js模块给独立导出一个.js...

    挥刀北上
  • DWR服务器推送技术

    参考博客:https://blog.csdn.net/Marksinoberg/article/details/55505423

    阮键
  • CSS3制作3D水晶糖果按钮

    本人仿照20个漂亮 CSS3 按钮效果及优秀的制作教程中的BonBon(Candy)Button实现了其棒棒糖果按钮,如下图所示:

    Dabelv
  • 内容平台管理难题如何破?知乎的答案是这样的

    知乎今天开始试运行新版管理规范,对一些影响用户体验、扰乱社区秩序的行为进行了明确限制,主要包括六种行为:诱导投票或关注、重复发布相同内容、频繁发布乱码内容、发布...

    罗超频道

扫码关注云+社区

领取腾讯云代金券