EventBus 源码学习笔记(三)

EventBus 深入学习三之Guava小结

上一篇讲述了 EventBus 的整个执行流程, 本片则从细节处出发,探讨下设计的精妙

  1. 巧妙的利用缓存, 解决重复耗时的操作
  2. 异步化的操作
  3. 队列存储消息, 以及如何避免消息的重复消费
  4. 消费的先后顺序
  5. 截断
  6. 异常处理

1. 缓存

看代码时,可以看到很多地方都用到了缓存,如再注册时, 根据class获取所有带注解的方法; 推送消息时,根据事件类型,获取所有的超类集合

如注册时,一条完整的调用链

com.google.common.eventbus.SubscriberRegistry#register ->
com.google.common.eventbus.SubscriberRegistry#findAllSubscribers ->  com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethods    ->
subscriberMethodsCache.getUnchecked(clazz) ->
com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethodsNotCached

2. 根据类查询所有超类

TypeToken.of(concreteClass).getTypes().rawTypes());

// 我们自己的实现, 一直到返回null为止
clz.getSuperClass().getSuperClass();
// 获取接口
clz.getInterfaces()

3. 异步

异步推送处理Event和同步处理主要的区别点是使用的 Dispatcher不同, 同步是使用 PerThreadQueuedDispatcher , 异步是 LegacyAsyncDispatcher

异步的消息分发

/**
* Global event queue.
*/
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
   Queues.newConcurrentLinkedQueue();
   
@Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }

同步的消息推送

/**
* Per-thread queue of events to dispatch.
*/
private final ThreadLocal<Queue<Event>> queue =
   new ThreadLocal<Queue<Event>>() {
     @Override
     protected Queue<Event> initialValue() {
       return Queues.newArrayDeque();
     }
   };
   
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
 checkNotNull(event);
 checkNotNull(subscribers);
 Queue<Event> queueForThread = queue.get();
 queueForThread.offer(new Event(event, subscribers));

 if (!dispatching.get()) {
   dispatching.set(true);
   try {
     Event nextEvent;
     while ((nextEvent = queueForThread.poll()) != null) {
       while (nextEvent.subscribers.hasNext()) {
         nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
       }
     }
   } finally {
     dispatching.remove();
     queue.remove();
   }
 }
}

private static final class Event {
 private final Object event;
 private final Iterator<Subscriber> subscribers;

 private Event(Object event, Iterator<Subscriber> subscribers) {
   this.event = event;
   this.subscribers = subscribers;
 }
}
}

执行时, 在 AsyncEventBus 是在线程池中执行; 而 EventBus 则是直接执行, 实质上的执行器

public static Executor directExecutor() {
    return DirectExecutor.INSTANCE;
  }

  /** See {@link #directExecutor} for behavioral notes. */
  private enum DirectExecutor implements Executor {
    INSTANCE;
    @Override public void execute(Runnable command) {
      command.run();
    }
  }

4. 线程安全

5. 异常处理

  • 没有订阅者时, 抛一个 DeadEvent
  • 订阅者接收消息后的,执行异常时 (订阅者之间的隔离)
    • 看下具体的执行,比较清晰, 将异常抛给 EventBus的 ExceptionHandler统一处理
final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
  @Override
  public void run() {
    try {
      invokeSubscriberMethod(event);
    } catch (InvocationTargetException e) {
      bus.handleSubscriberException(e.getCause(), context(event));
    }
  }
});
}

6. 消费顺序 & 截断

Guava的EventBus不支持定义订阅者的顺序,更谈不上截断

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏IT杂记

通过Java程序提交通用Mapreduce任务并获取Job信息

背景 我们的一个业务须要有对MR任务的提交和状态跟踪的功能,须要通过Java代码提交一个通用的MR任务(包括mr的jar、配置文件、依赖的第三方jar包),并且...

1.1K50
来自专栏LeoXu的博客

Flex笔记_验证用户输入

11720
来自专栏進无尽的文章

简述OC语言

对于一门语言的学习是需要时间领悟的,而对于一些原理性的问题,我们需要清楚其核心思想,知其然而知其所以然,这样才能有利于自己的后续发展。本文只是简述,没有面面具到...

24020
来自专栏IT技术精选文摘

ZooKeeper 分布式锁实现

17920
来自专栏hotqin888的专栏

bootstrap treeview 增删改的正确姿势

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/hotqin888/article/det...

51830
来自专栏大内老A

基于CallContextInitializer的WCF扩展导致的严重问题

WCF是一个具有极高扩展度的分布式通信框架,无论是在信道层(Channel Layer)还是服务模型层(Service Model),我们都可以自定义相关组件通...

22790
来自专栏犀利豆的技术空间

徒手撸框架--实现 RPC 远程调用

微服务已经是每个互联网开发者必须掌握的一项技术。而 RPC 框架,是构成微服务最重要的组成部分之一。趁最近有时间。又看了看 dubbo 的源码。dubbo 为了...

16620
来自专栏大内老A

WCF技术剖析之十八:消息契约(Message Contract)和基于消息契约的序列化

在本篇文章中,我们将讨论WCF四大契约(服务契约、数据契约、消息契约和错误契约)之一的消息契约(Message Contract)。服务契约关注于对服务操作的描...

40250
来自专栏五毛程序员

五毛的cocos2d-x学习笔记07-计时器、数据读写、文件读写

26950
来自专栏Golang语言社区

Golang语言 之网络

Go语言标准库里提供的net包,支持基于IP层、TCP/UDP层及更高层面(如HTTP、FTP、SMTP)的网络操作,其中用于IP层的称为Raw Socket。...

37190

扫码关注云+社区

领取腾讯云代金券