前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >nodejs v14源码分析之event模块

nodejs v14源码分析之event模块

作者头像
theanarkh
发布2021-03-15 15:00:40
3330
发布2021-03-15 15:00:40
举报
文章被收录于专栏:原创分享

本章本来是《nodejs源码剖析》的一章,由于篇幅有限,已经删除了,所以开放出来(已经同步到github)。相信很多同学也看过该模块的代码。

events模块是Node.js中比较简单但是却非常核心的模块,Node.js中,很多模块都继承于events模块,events模块是发布、订阅模式的实现。我们首先看一下如何使用events模块。

代码语言:javascript
复制
1.    const { EventEmitter } = require('events');  
2.    class Events extends EventEmitter {}  
3.    const events = new Events();  
4.    events.on('demo', () => {  
5.        console.log('emit demo event');  
6.    });  
7.    events.emit('demo');

接下来我们看一下events模块的具体实现。

1 初始化 当new一个EventEmitter或者它的子类时,就会进入EventEmitter的逻辑。

代码语言:javascript
复制
1.    function EventEmitter(opts) {  
2.      EventEmitter.init.call(this, opts);  
3.    }  
4.      
5.    EventEmitter.init = function(opts) {  
6.      // 如果是未初始化或者没有自定义_events,则初始化  
7.      if (this._events === undefined ||  
8.          this._events === ObjectGetPrototypeOf(this)._events) {  
9.         this._events = ObjectCreate(null);  
10.        this._eventsCount = 0;  
11.      }  
12.      /*
13.        初始化一类事件的处理函数个数的阈值
14.        我们可以通过setMaxListeners接口设置,
15.        如果没有显示设置,阈值则为defaultMaxListeners的值(10),
16.        可通过getMaxListeners接口获取
17.      */  
18.      this._maxListeners = this._maxListeners || undefined;  
19.      
20.      // 是否开启捕获promise reject,默认false  
21.      if (opts && opts.captureRejections) {  
22.        this[kCapture] = Boolean(opts.captureRejections);  
23.      } else {  
24.        this[kCapture] = EventEmitter.prototype[kCapture];  
25.      }  
26.    };

EventEmitter的初始化主要是初始化了一些数据结构和属性。唯一支持的一个参数就是captureRejections,captureRejections表示当触发事件,执行处理函数时,EventEmitter是否捕获处理函数中的异常。后面我们会详细讲解。

2 订阅事件 初始化完EventEmitter之后,我们就可以开始使用订阅、发布的功能。我们可以通过addListener、prependListener、on、once订阅事件。addListener和on是等价的,prependListener的区别在于处理函数会被插入到队首,而默认是追加到队尾。once注册的处理函数,最多被执行一次。四个api都是通过_addListener函数实现的。下面我们看一下具体实现。

代码语言:javascript
复制
1.    function _addListener(target, type, listener, prepend) {  
2.      let m;  
3.      let events;  
4.      let existing;  
5.      events = target._events;  
6.      // 还没有初始化_events则初始化,_eventsCount为事件类型个数  
7.      if (events === undefined) {  
8.        events = target._events = ObjectCreate(null);  
9.        target._eventsCount = 0;  
10.      } else {  
11.        /* 
12.          已经注册过事件,则判断是否定义了newListener事件,
13.           是的话先触发,如果监听了newListener事件,每次注册
14.           其它事件时都会触发newListener,相当于钩子 
15.        */  
16.        if (events.newListener !== undefined) {  
17.          target.emit('newListener', 
18.                        type,  
19.                      listener.listener ? 
20.                        listener.listener : 
21.                        listener);  
22.          // newListener处理函数可能会修改_events,这里重新赋值  
23.          events = target._events;  
24.        }  
25.        // 判断是否已经存在处理函数  
26.        existing = events[type];  
27.      }  
28.      // 不存在则以函数的形式存储,否则以数组形式存储  
29.      if (existing === undefined) {  
30.        events[type] = listener;  
31.        // 新增一个事件类型,事件类型个数加一
32.        ++target._eventsCount;  
33.      } else {  
34.        /* 
35.           existing是函数说明之前注册过该事件一次,
36.           否则说明existing为数组,则直接插入相应位置
37.        */
38.        if (typeof existing === 'function') {  
39.          existing = events[type] =  
40.            prepend ? [listener, existing] : [existing, listener];  
41.        } else if (prepend) {  
42.          existing.unshift(listener);  
43.        } else {  
44.          existing.push(listener);  
45.        }  
46.      
47.        // 处理告警,处理函数过多可能是因为之前的没有删除,造成内存泄漏  
48.        m = _getMaxListeners(target);  
49.        // 该事件处理函数达到阈值并且还没有提示过警告信息则提示
50.        if (m > 0 && existing.length > m && !existing.warned) {  
51.          existing.warned = true;  
52.          const w = new Error('错误信息…');  
53.          w.name = 'MaxListenersExceededWarning';  
54.          w.emitter = target;  
55.          w.type = type;  
56.          w.count = existing.length;  
57.          process.emitWarning(w);  
58.        }  
59.      }  
60.      
61.      return target;  
62.    }

接下来我们看一下once的实现,对比其它几种api,once的实现相对比较复杂,因为我们要控制处理函数最多执行一次,所以我们需要保证在事件触发的时候,执行用户定义函数的同时,还需要删除注册的事件。

代码语言:javascript
复制
1.    EventEmitter.prototype.once = function once(type, listener) {  
2.      this.on(type, _onceWrap(this, type, listener));  
3.      return this;  
4.    };  
5.      
6.    function onceWrapper() {  
7.      // 还没有触发过  
8.      if (!this.fired) {  
9.        // 删除它  
10.        this.target.removeListener(this.type, this.wrapFn);  
11.        // 触发了  
12.        this.fired = true;  
13.        // 执行  
14.        if (arguments.length === 0)  
15.          return this.listener.call(this.target);  
16.        return this.listener.apply(this.target, arguments);  
17.      }  
18.    }  
19.    // 支持once api  
20.    function _onceWrap(target, type, listener) {  
21.      // fired是否已执行处理函数,wrapFn包裹listener的函数  
22.      const state = { fired: false, wrapFn: undefined, target, type, listener };  
23.      // 生成一个包裹listener的函数  
24.      const wrapped = onceWrapper.bind(state);  
25.      /*
26.        把原函数listener也挂到包裹函数中,用于事件没有触发前,
27.        用户主动删除,见removeListener  
28.      */
29.      wrapped.listener = listener;  
30.      // 保存包裹函数,用于执行完后删除,见onceWrapper  
31.      state.wrapFn = wrapped;  
32.      return wrapped;  
33.    }

Once函数构造一个上下文(state)保存用户处理函数和执行状态等信息,然后通过bind返回一个带有该上下文(state)的函数wrapped注册到事件系统。当事件触发时,在wrapped函数中首先移除wrapped,然后执行用户的函数。Wrapped起到了劫持的作用。另外还需要在wrapped上保存用户传进来的函数,当用户在事件触发前删除该事件时或解除该函数时,在遍历该类事件的处理函数过程中,可以通过wrapped.listener找到对应的项进行删除。

3 触发事件 分析完事件的订阅,接着我们看一下事件的触发。

代码语言:javascript
复制
1.    EventEmitter.prototype.emit = function emit(type, ...args) {  
2.      // 触发的事件是否是error,error事件需要特殊处理  
3.      let doError = (type === 'error');  
4.      
5.      const events = this._events;  
6.      // 定义了处理函数(不一定是type事件的处理函数)  
7.      if (events !== undefined) {  
8.         /*
9.          如果触发的事件是error,并且监听了kErrorMonitor
10.         事件则触发kErrorMonitor事件
11.        */  
12.        if (doError && events[kErrorMonitor] !== undefined)  
13.          this.emit(kErrorMonitor, ...args);  
14.        // 触发的是error事件但是没有定义处理函数  
15.        doError = (doError && events.error === undefined);  
16.      } else if (!doError) 
17.        // 没有定义处理函数并且触发的不是error事件则不需要处理,  
18.        return false;  
19.      
20.      // If there is no 'error' event listener then throw.  
21.      // 触发的是error事件,但是没有定义处理error事件的函数,则报错  
22.      if (doError) {  
23.        let er;  
24.        if (args.length > 0)  
25.          er = args[0];  
26.        // 第一个入参是Error的实例  
27.        if (er instanceof Error) {  
28.          try {  
29.            const capture = {};  
30.            /* 
31.              给capture对象注入stack属性,stack的值是执行    
32.               Error.captureStackTrace语句的当前栈信息,但是
33.               不包括emit的部分 
34.            */  
35.            Error.captureStackTrace(capture, EventEmitter.prototype.emit);  
36.            ObjectDefineProperty(er, kEnhanceStackBeforeInspector, {  
37.              value: enhanceStackTrace.bind(this, er, capture),  
38.              configurable: true  
39.            });  
40.          } catch {}  
41.          throw er; // Unhandled 'error' event  
42.        }  
43.      
44.        let stringifiedEr;  
45.        const { inspect } = require('internal/util/inspect');  
46.        try {  
47.          stringifiedEr = inspect(er);  
48.        } catch {  
49.          stringifiedEr = er;  
50.        }  
51.        const err = new ERR_UNHANDLED_ERROR(stringifiedEr);  
52.        err.context = er;  
53.        throw err; // Unhandled 'error' event  
54.      }  
55.      // 获取type事件对应的处理函数  
56.      const handler = events[type];  
57.      // 没有则不处理  
58.      if (handler === undefined)  
59.        return false;  
60.      // 等于函数说明只有一个  
61.      if (typeof handler === 'function') {  
62.        // 直接执行  
63.        const result = ReflectApply(handler, this, args);  
64.        // 非空判断是不是promise并且是否需要处理,见addCatch  
65.        if (result !== undefined && result !== null) {  
66.          addCatch(this, result, type, args);  
67.        }  
68.      } else {  
69.        // 多个处理函数,同上  
70.        const len = handler.length;  
71.        const listeners = arrayClone(handler, len);  
72.        for (let i = 0; i < len; ++i) {  
73.          const result = ReflectApply(listeners[i], this, args);  
74.          if (result !== undefined && result !== null) {  
75.            addCatch(this, result, type, args);  
76.          }  
77.        }  
78.      }  
79.      
80.      return true;  
81.    }

我们看到在Node.js中,对于error事件是特殊处理的,如果用户没有注册error事件的处理函数,可能会导致程序挂掉,另外我们看到有一个addCatch的逻辑,addCatch是为了支持事件处理函数为异步模式的情况,比如async函数或者返回Promise的函数。

代码语言:javascript
复制
1.    function addCatch(that, promise, type, args) {  
2.      // 没有开启捕获则不需要处理  
3.      if (!that[kCapture]) {  
4.        return;  
5.      }  
6.      // that throws on second use.  
7.      try {  
8.        const then = promise.then;  
9.      
10.        if (typeof then === 'function') {  
11.          // 注册reject的处理函数  
12.          then.call(promise, undefined, function(err) {  
13.            process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);  
14.          });  
15.        }  
16.      } catch (err) {  
17.        that.emit('error', err);  
18.      }  
19.    }  
20.      
21.    function emitUnhandledRejectionOrErr(ee, err, type, args) {  
22.      // 用户实现了kRejection则执行  
23.      if (typeof ee[kRejection] === 'function') {  
24.        ee[kRejection](err, type, ...args);  
25.      } else {  
26.        // 保存当前值  
27.        const prev = ee[kCapture];  
28.        try {  
29.          /* 
30.            关闭然后触发error事件,意义 
31.            1 防止error事件处理函数也抛出error,导致死循环 
32.            2 如果用户处理了error,则进程不会退出,所以需要恢复
33.               kCapture的值如果用户没有处理error,则Node.js会触发
34.               uncaughtException,如果用户处理了uncaughtException
35.               则需要恢复kCapture的值 
36.          */  
37.          ee[kCapture] = false;  
38.          ee.emit('error', err);  
39.        } finally {  
40.          ee[kCapture] = prev;  
41.        }  
42.      }  
43.    }

4 取消订阅 我们接着看一下删除事件处理函数的逻辑。

代码语言:javascript
复制
1.    function removeAllListeners(type) {  
2.          const events = this._events;  
3.          if (events === undefined)  
4.            return this;  
5.      
6.          /*
7.            没有注册removeListener事件,则只需要删除数据,
8.            否则还需要触发removeListener事件  
9.           */
10.          if (events.removeListener === undefined) {  
11.            // 等于0说明是删除全部  
12.            if (arguments.length === 0) {  
13.              this._events = ObjectCreate(null);  
14.              this._eventsCount = 0;  
15.            } else if (events[type] !== undefined) { 
16.               /*
17.                 否则是删除某个类型的事件,是唯一一个处理函数,
18.                 则重置_events,否则删除对应的事件类型         
19.               */
20.              if (--this._eventsCount === 0)  
21.                this._events = ObjectCreate(null);  
22.              else  
23.                delete events[type];  
24.            }  
25.            return this;  
26.          }  
27.      
28.          /*
29.            说明注册了removeListener事件,arguments.length === 0
30.            说明删除所有类型的事件  
31.           */
32.          if (arguments.length === 0) {  
33.            /* 
34.              逐个删除,除了removeListener事件,
35.              这里删除了非removeListener事件
36.             */  
37.            for (const key of ObjectKeys(events)) {  
38.              if (key === 'removeListener') continue;  
39.              this.removeAllListeners(key);  
40.            }  
41.            // 这里删除removeListener事件,见下面的逻辑  
42.            this.removeAllListeners('removeListener');  
43.            // 重置数据结构  
44.            this._events = ObjectCreate(null);  
45.            this._eventsCount = 0;  
46.            return this;  
47.          }  
48.          // 删除某类型事件  
49.          const listeners = events[type];  
50.      
51.          if (typeof listeners === 'function') {  
52.            this.removeListener(type, listeners);  
53.          } else if (listeners !== undefined) {  
54.            // LIFO order  
55.            for (let i = listeners.length - 1; i >= 0; i--) {  
56.              this.removeListener(type, listeners[i]);  
57.            }  
58.          }  
59.      
60.          return this;  
61.        }

removeAllListeners函数主要的逻辑有两点,第一个是removeListener事件需要特殊处理,这类似一个钩子,每次用户删除事件处理函数的时候都会触发该事件。第二是removeListener函数。removeListener是真正删除事件处理函数的实现。removeAllListeners是封装了removeListener的逻辑。

代码语言:javascript
复制
1.    function removeListener(type, listener) {  
2.       let originalListener;  
3.       const events = this._events;  
4.       // 没有东西可删除  
5.       if (events === undefined)  
6.         return this;  
7.      
8.       const list = events[type];  
9.       // 同上  
10.       if (list === undefined)  
11.         return this;  
12.       // list是函数说明只有一个处理函数,否则是数组,如果list.listener === listener说明是once注册的  
13.       if (list === listener || list.listener === listener) {  
14.         // type类型的处理函数就一个,并且也没有注册其它类型的事件,则初始化_events  
15.         if (--this._eventsCount === 0)  
16.           this._events = ObjectCreate(null);  
17.         else {  
18.           // 就一个执行完删除type对应的属性  
19.           delete events[type];  
20.           // 注册了removeListener事件,则先注册removeListener事件  
21.           if (events.removeListener)  
22.             this.emit('removeListener',
23.                         type,
24.                         list.listener || listener);  
25.         }  
26.       } else if (typeof list !== 'function') {  
27.         // 多个处理函数  
28.         let position = -1;  
29.         // 找出需要删除的函数  
30.         for (let i = list.length - 1; i >= 0; i--) {  
31.           if (list[i] === listener || 
32.                list[i].listener === listener) {  
33.             // 保存原处理函数,如果有的话  
34.             originalListener = list[i].listener;  
35.             position = i;  
36.             break;  
37.           }  
38.         }  
39.      
40.         if (position < 0)  
41.           return this;  
42.         // 第一个则出队,否则删除一个  
43.         if (position === 0)  
44.           list.shift();  
45.         else {  
46.           if (spliceOne === undefined)  
47.             spliceOne = require('internal/util').spliceOne;  
48.           spliceOne(list, position);  
49.         }  
50.         // 如果只剩下一个,则值改成函数类型  
51.         if (list.length === 1)  
52.           events[type] = list[0];  
53.         // 触发removeListener  
54.         if (events.removeListener !== undefined)  
55.           this.emit('removeListener', 
56.                       type,
57.                       originalListener || listener);  
58.       }  
59.      
60.       return this;  
61.     };

以上就是events模块的核心逻辑,另外还有一些工具函数就不一一分析。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-02-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档