前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊jesque的event机制

聊聊jesque的event机制

作者头像
code4it
发布2018-09-17 15:19:57
5040
发布2018-09-17 15:19:57
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要介绍一下jesque的event机制

WorkerEvent

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEvent.java

代码语言:javascript
复制
/**
 * The possible WorkerEvents that a WorkerListener may register for.
 */
public enum WorkerEvent {

    /**
     * The Worker just finished starting up and is about to start running.
     */
    WORKER_START,
    /**
     * The Worker is polling the queue.
     */
    WORKER_POLL,
    /**
     * The Worker is processing a Job.
     */
    JOB_PROCESS,
    /**
     * The Worker is about to execute a materialized Job.
     */
    JOB_EXECUTE,
    /**
     * The Worker successfully executed a materialized Job.
     */
    JOB_SUCCESS,
    /**
     * The Worker caught an Exception during the execution of a materialized Job.
     */
    JOB_FAILURE,
    /**
     * The Worker caught an Exception during normal operation.
     */
    WORKER_ERROR,
    /**
     * The Worker just finished running and is about to shutdown.
     */
    WORKER_STOP;
}

JOB_PROCESS与JOB_EXECUTE可能让人有点迷糊。二者之间有个去redis更新状态以及实例化job的操作,而JOB_EXECUTE则是before execute的意思 JOB_SUCCESS以及JOB_FAILURE则是after execute的意思

WorkerEventEmitter

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEventEmitter.java

代码语言:javascript
复制
/**
 * A WorkerEventEmitter allows WorkerListeners to register for WorkerEvents.
 */
public interface WorkerEventEmitter {

    /**
     * Register a WorkerListener for all WorkerEvents. 
     * @param listener the WorkerListener to register
     */
    void addListener(WorkerListener listener);

    /**
     * Register a WorkerListener for the specified WorkerEvents.
     * @param listener the WorkerListener to register
     * @param events the WorkerEvents to be notified of
     */
    void addListener(WorkerListener listener, WorkerEvent... events);

    /**
     * Unregister a WorkerListener for all WorkerEvents.
     * @param listener the WorkerListener to unregister
     */
    void removeListener(WorkerListener listener);

    /**
     * Unregister a WorkerListener for the specified WorkerEvents.
     * @param listener the WorkerListener to unregister
     * @param events the WorkerEvents to no longer be notified of
     */
    void removeListener(WorkerListener listener, WorkerEvent... events);

    /**
     * Unregister all WorkerListeners for all WorkerEvents.
     */
    void removeAllListeners();

    /**
     * Unregister all WorkerListeners for the specified WorkerEvents.
     * @param events the WorkerEvents to no longer be notified of
     */
    void removeAllListeners(WorkerEvent... events);
}

定义了event emitter的接口

WorkerListenerDelegate

jesque-2.1.0-sources.jar!/net/greghaines/jesque/worker/WorkerListenerDelegate.java

代码语言:javascript
复制
/**
 * WorkerListenerDelegate keeps track of WorkerListeners and notifies each listener when fireEvent() is invoked.
 */
public class WorkerListenerDelegate implements WorkerEventEmitter {

    private static final Logger log = LoggerFactory.getLogger(WorkerListenerDelegate.class);

    private final Map<WorkerEvent, ConcurrentSet<WorkerListener>> eventListenerMap;

    /**
     * Constructor.
     */
    public WorkerListenerDelegate() {
        final Map<WorkerEvent, ConcurrentSet<WorkerListener>> elp = 
                new EnumMap<WorkerEvent, ConcurrentSet<WorkerListener>>(WorkerEvent.class);
        for (final WorkerEvent event : WorkerEvent.values()) {
            elp.put(event, new ConcurrentHashSet<WorkerListener>());
        }
        this.eventListenerMap = Collections.unmodifiableMap(elp);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void addListener(final WorkerListener listener) {
        addListener(listener, WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void addListener(final WorkerListener listener, final WorkerEvent... events) {
        if (listener != null) {
            for (final WorkerEvent event : events) {
                final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
                if (listeners != null) {
                    listeners.add(listener);
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeListener(final WorkerListener listener) {
        removeListener(listener, WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeListener(final WorkerListener listener, final WorkerEvent... events) {
        if (listener != null) {
            for (final WorkerEvent event : events) {
                final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
                if (listeners != null) {
                    listeners.remove(listener);
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeAllListeners() {
        removeAllListeners(WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeAllListeners(final WorkerEvent... events) {
        for (final WorkerEvent event : events) {
            final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
            if (listeners != null) {
                listeners.clear();
            }
        }
    }

    /**
     * Notify all WorkerListeners currently registered for the given WorkerEvent.
     * @param event the WorkerEvent that occurred
     * @param worker the Worker that the event occurred in
     * @param queue the queue the Worker is processing
     * @param job the Job related to the event (only supply for JOB_PROCESS, JOB_EXECUTE, JOB_SUCCESS, and 
     * JOB_FAILURE events)
     * @param runner the materialized object that the Job specified (only supply for JOB_EXECUTE and 
     * JOB_SUCCESS events)
     * @param result the result of the successful execution of the Job (only set for JOB_SUCCESS and if the Job was 
     * a Callable that returned a value)
     * @param t the Throwable that caused the event (only supply for JOB_FAILURE and ERROR events)
     */
    public void fireEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job, 
            final Object runner, final Object result, final Throwable t) {
        final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
        if (listeners != null) {
            for (final WorkerListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.onEvent(event, worker, queue, job, runner, result, t);
                    } catch (Exception e) {
                        log.error("Failure executing listener " + listener + " for event " + event 
                                + " from queue " + queue + " on worker " + worker, e);
                    }
                }
            }
        }
    }
}

event emitter的实现类,使用EnumMap来存放listener,key是WorkerEvent枚举,而value则是listener的ConcurrentSet,即同一个event可以有多个listener。

事件的触发

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.java

代码语言:javascript
复制
protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate();

//......
protected void process(final Job job, final String curQueue) {
        try {
            this.processingJob.set(true);
            if (threadNameChangingEnabled) {
                renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
            }
            this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null);
            this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job));
            final Object instance = this.jobFactory.materializeJob(job);
            final Object result = execute(job, curQueue, instance);
            success(job, instance, result, curQueue);
        } catch (Throwable thrwbl) {
            failure(thrwbl, job, curQueue);
        } finally {
            removeInFlight(curQueue);
            this.jedis.del(key(WORKER, this.name));
            this.processingJob.set(false);
        }
    }

在wokerImpl类里头,组合了WorkerEventEmitter的实现类,然后在相应的方法里头去触发/通知相应的listener(默认是同步执行)

小结

其实本质就是观察者模式,workerImpl是被观察者,listener是观察者,wokerImpl在有相应执行点会触发相应事件,同步通知listner执行相关逻辑。

doc

  • jesque
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-11-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • WorkerEvent
  • WorkerEventEmitter
  • WorkerListenerDelegate
  • 事件的触发
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档