聊聊jesque的event机制

本文主要介绍一下jesque的event机制

WorkerEvent

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEvent.java

/**
 * The possible WorkerEvents that a WorkerListener may register for.
 */
public enum WorkerEvent {

    /**
     * The Worker just finished starting up and is about to start running.
     */
    WORKER_START,
    /**
     * The Worker is polling the queue.
     */
    WORKER_POLL,
    /**
     * The Worker is processing a Job.
     */
    JOB_PROCESS,
    /**
     * The Worker is about to execute a materialized Job.
     */
    JOB_EXECUTE,
    /**
     * The Worker successfully executed a materialized Job.
     */
    JOB_SUCCESS,
    /**
     * The Worker caught an Exception during the execution of a materialized Job.
     */
    JOB_FAILURE,
    /**
     * The Worker caught an Exception during normal operation.
     */
    WORKER_ERROR,
    /**
     * The Worker just finished running and is about to shutdown.
     */
    WORKER_STOP;
}

JOB_PROCESS与JOB_EXECUTE可能让人有点迷糊。二者之间有个去redis更新状态以及实例化job的操作,而JOB_EXECUTE则是before execute的意思 JOB_SUCCESS以及JOB_FAILURE则是after execute的意思

WorkerEventEmitter

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEventEmitter.java

/**
 * A WorkerEventEmitter allows WorkerListeners to register for WorkerEvents.
 */
public interface WorkerEventEmitter {

    /**
     * Register a WorkerListener for all WorkerEvents. 
     * @param listener the WorkerListener to register
     */
    void addListener(WorkerListener listener);

    /**
     * Register a WorkerListener for the specified WorkerEvents.
     * @param listener the WorkerListener to register
     * @param events the WorkerEvents to be notified of
     */
    void addListener(WorkerListener listener, WorkerEvent... events);

    /**
     * Unregister a WorkerListener for all WorkerEvents.
     * @param listener the WorkerListener to unregister
     */
    void removeListener(WorkerListener listener);

    /**
     * Unregister a WorkerListener for the specified WorkerEvents.
     * @param listener the WorkerListener to unregister
     * @param events the WorkerEvents to no longer be notified of
     */
    void removeListener(WorkerListener listener, WorkerEvent... events);

    /**
     * Unregister all WorkerListeners for all WorkerEvents.
     */
    void removeAllListeners();

    /**
     * Unregister all WorkerListeners for the specified WorkerEvents.
     * @param events the WorkerEvents to no longer be notified of
     */
    void removeAllListeners(WorkerEvent... events);
}

定义了event emitter的接口

WorkerListenerDelegate

jesque-2.1.0-sources.jar!/net/greghaines/jesque/worker/WorkerListenerDelegate.java

/**
 * WorkerListenerDelegate keeps track of WorkerListeners and notifies each listener when fireEvent() is invoked.
 */
public class WorkerListenerDelegate implements WorkerEventEmitter {

    private static final Logger log = LoggerFactory.getLogger(WorkerListenerDelegate.class);

    private final Map<WorkerEvent, ConcurrentSet<WorkerListener>> eventListenerMap;

    /**
     * Constructor.
     */
    public WorkerListenerDelegate() {
        final Map<WorkerEvent, ConcurrentSet<WorkerListener>> elp = 
                new EnumMap<WorkerEvent, ConcurrentSet<WorkerListener>>(WorkerEvent.class);
        for (final WorkerEvent event : WorkerEvent.values()) {
            elp.put(event, new ConcurrentHashSet<WorkerListener>());
        }
        this.eventListenerMap = Collections.unmodifiableMap(elp);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void addListener(final WorkerListener listener) {
        addListener(listener, WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void addListener(final WorkerListener listener, final WorkerEvent... events) {
        if (listener != null) {
            for (final WorkerEvent event : events) {
                final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
                if (listeners != null) {
                    listeners.add(listener);
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeListener(final WorkerListener listener) {
        removeListener(listener, WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeListener(final WorkerListener listener, final WorkerEvent... events) {
        if (listener != null) {
            for (final WorkerEvent event : events) {
                final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
                if (listeners != null) {
                    listeners.remove(listener);
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeAllListeners() {
        removeAllListeners(WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeAllListeners(final WorkerEvent... events) {
        for (final WorkerEvent event : events) {
            final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
            if (listeners != null) {
                listeners.clear();
            }
        }
    }

    /**
     * Notify all WorkerListeners currently registered for the given WorkerEvent.
     * @param event the WorkerEvent that occurred
     * @param worker the Worker that the event occurred in
     * @param queue the queue the Worker is processing
     * @param job the Job related to the event (only supply for JOB_PROCESS, JOB_EXECUTE, JOB_SUCCESS, and 
     * JOB_FAILURE events)
     * @param runner the materialized object that the Job specified (only supply for JOB_EXECUTE and 
     * JOB_SUCCESS events)
     * @param result the result of the successful execution of the Job (only set for JOB_SUCCESS and if the Job was 
     * a Callable that returned a value)
     * @param t the Throwable that caused the event (only supply for JOB_FAILURE and ERROR events)
     */
    public void fireEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job, 
            final Object runner, final Object result, final Throwable t) {
        final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
        if (listeners != null) {
            for (final WorkerListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.onEvent(event, worker, queue, job, runner, result, t);
                    } catch (Exception e) {
                        log.error("Failure executing listener " + listener + " for event " + event 
                                + " from queue " + queue + " on worker " + worker, e);
                    }
                }
            }
        }
    }
}

event emitter的实现类,使用EnumMap来存放listener,key是WorkerEvent枚举,而value则是listener的ConcurrentSet,即同一个event可以有多个listener。

事件的触发

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.java

protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate();

//......
protected void process(final Job job, final String curQueue) {
        try {
            this.processingJob.set(true);
            if (threadNameChangingEnabled) {
                renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
            }
            this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null);
            this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job));
            final Object instance = this.jobFactory.materializeJob(job);
            final Object result = execute(job, curQueue, instance);
            success(job, instance, result, curQueue);
        } catch (Throwable thrwbl) {
            failure(thrwbl, job, curQueue);
        } finally {
            removeInFlight(curQueue);
            this.jedis.del(key(WORKER, this.name));
            this.processingJob.set(false);
        }
    }

在wokerImpl类里头,组合了WorkerEventEmitter的实现类,然后在相应的方法里头去触发/通知相应的listener(默认是同步执行)

小结

其实本质就是观察者模式,workerImpl是被观察者,listener是观察者,wokerImpl在有相应执行点会触发相应事件,同步通知listner执行相关逻辑。

doc

  • jesque

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2017-11-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java闲聊

第二篇 SSM运行Demo

20650
来自专栏IT笔记

SpringBoot开发案例之整合mail队列进阶篇

上一篇文章,我们为了解决实际场景中遇到的问题,使得其更像一个安全高效的邮件服务平台,我们引入了LinkedBlockingQueue队列对邮件发送进行流量削锋、...

50570
来自专栏Ryan Miao

Spring-AOP实践 - 统计访问时间

公司的项目有的页面超级慢,20s以上,不知道用户会不会疯掉,于是老大说这个页面要性能优化。于是,首先就要搞清楚究竟是哪一步耗时太多。 我采用spring aop...

52580
来自专栏Android源码框架分析

听说你Binder机制学的不错,来面试下这几个问题(三)

很多文章将Binder框架定义了四个角色:Server,Client,ServiceManager、以及Binder驱动,但这容易将人引导到歧途:好像所有的Bi...

25220
来自专栏bboysoul

关于linux下raid的设备文件和格式化

今天给dell t20装了zstack,没错zstack镜像底层其实就是centos,服务器里面有四块硬盘,一块300g的我是做系统盘的,三块1T的硬盘我是打算...

13520
来自专栏程序员与猫

Log system architecture

Keywords: Collector, Processor, Aggregator

18610
来自专栏蓝天

Redis模块开发示例

实现一个Redis module,支持两个扩展命令: 1) 可同时对hash的多个field进行incr操作; 2) incrby同时设置一个key的过期时...

12730
来自专栏贾老师の博客

CMake 使用

12130
来自专栏Netkiller

Spring boot with Apache Hive

本文节选自《Netkiller Database 手札》 5.26. Spring boot with Apache Hive 5.26.1. Maven ...

84750
来自专栏Kubernetes

Controlling Access to the Kubernetes API

? API Server Ports and IPs By default the Kubernetes API server serves HTTP o...

29380

扫码关注云+社区

领取腾讯云代金券