前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >nodejs线程池的设计与实现

nodejs线程池的设计与实现

作者头像
theanarkh
发布2020-11-02 14:31:17
1.1K0
发布2020-11-02 14:31:17
举报
文章被收录于专栏:原创分享

前言:之前的版本不方便开放,重新设计了一版nodejs的线程池库,本文介绍该库的一些设计和实现。

nodejs虽然提供了线程的能力,但是很多时候,往往不能直接使用线程或者无限制地创建线程,比如我们有一个功能是cpu密集型的,如果一个请求就开一个线程,这很明显不是最好的实践,这时候,我们需要使用池化的技术,本文介绍在nodejs线程模块的基础上,如何设计和实现一个线程池库(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是线程池的总体架构。

设计一个线程池,在真正写代码之前,有很多设计需要考虑,大概如下:

1任务队列的设计,一个队列,多个线程互斥访问,或者每个线程一个队列,不需要互斥访问。

2 线程退出的设计,可以由主线程检测空闲线程,然后使子线程退出。或者子线程退出,通知主线程。空闲不一定是没有任务就退出,可以设计空闲时间达到阈值后退出,因为创建线程是有时间开销的。

3 任务数的设计,每个线程可以有个任务数,还可以增加一个总任务数,即全部线程任务数加起来

4 选择线程的设计,选择任务数最少的线程。

5 线程类型的设计,可以区分核心线程和预备线程,任务少的时候,核心线程处理就行。任务多也创建预备线程帮忙处理。

6 线程池类型的设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。

7 支持任务的取消和超时机制,防止一个任务时间过长或者死循环。

本文介绍的线程池具体设计思想如下(参考java):

1 主线程维护一个队列,子线程的任务由子线程负责分发,不需要互斥访问,子线程也不需要维护自己的队列。

2 线程退出的设计,主线程负责检查子线程空闲时间是否达到阈值,是则使子线程退出。

3 任务数的设计,主线程负责管理任务个数并应有相应的策略。

4 选择线程的设计,选择任务数最少的线程。

5 线程类型的设计,区分核心线程和预备线程,任务少的时候,核心线程处理就行。任务多也创建预备线程帮忙处理。

6 线程池类型的设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。

7 支持任务的取消和超时机制,超时或者取消的时候,主线程判断任务是待执行还是正在执行,如果是待执行则从任务队列中删除,如果是正在执行则杀死对应的子线程。下面我们看一下具体的设计。

1 主线程和子线程通信的数据结构

代码语言:javascript
复制
// 任务类,一个任务对应一个id
class Work {
    constructor({workId, filename, options}) {
        // 任务id
        this.workId = workId;
        // 任务逻辑,字符串或者js文件路径
        this.filename = filename;
        // 任务返回的结果
        this.data = null;
        // 任务返回的错误
        this.error = null;
        // 执行任务时传入的参数,用户定义
        this.options = options;
    }
}

主线程给子线程分派一个任务的时候,就给子线程发送一个Work对象。在nodejs中线程间通信需要经过序列化和反序列化,所以通信的数据结构包括的信息不能过多。

2 子线程处理任务逻辑

代码语言:javascript
复制
const { parentPort } = require('worker_threads');
const vm = require('vm');
const { isFunction, isJSFile } = require('./utils');

// 监听主线程提交过来的任务
parentPort.on('message', async (work) => {
    try {
        const { filename, options } = work;
        let aFunction;
        if (isJSFile(filename)) {
            aFunction = require(filename);
        } else {
            aFunction = vm.runInThisContext(`(${filename})`);
        }
        if (!isFunction(aFunction)) {
            throw new Error('work type error: js file or string');
        }
        work.data = await aFunction(options);
        parentPort.postMessage({event: 'done', work});
    } catch (error) {
        work.error = error.toString();
        parentPort.postMessage({event: 'error', work});
    }
});

process.on('uncaughtException', (...rest) => {
    console.error(...rest);
});

process.on('unhandledRejection', (...rest) => {
    console.error(...rest);
});

子线程的逻辑比较简单,就是监听主线程分派过来的任务,然后执行任务,执行完之后通知主线程。任务支持js文件和字符串代码的形式。需要返回一个Promise或者async函数。用于用于通知主线程任务已经完成。

3 线程池和业务的通信

代码语言:javascript
复制
// 提供给用户侧的接口
class UserWork extends EventEmitter {
    constructor({ workId }) {
        super();
        // 任务id
        this.workId = workId;
        // 支持超时取消任务
        this.timer = null;
        // 任务状态
        this.state = WORK_STATE.PENDDING;
    }
    // 超时后取消任务
    setTimeout(timeout) {
        this.timer = setTimeout(() => {
            this.timer && this.cancel() && this.emit('timeout');
        }, ~~timeout);
    }
    // 取消之前设置的定时器
    clearTimeout() {
        clearTimeout(this.timer);
        this.timer = null;
    }
    // 直接取消任务,如果执行完了就不能取消了,this.terminate是动态设置的
    cancel() {
        if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) {
           return false;
        } else {
            this.terminate();
            return true;
        }
    }
    // 修改任务状态
    setState(state) {
        this.state = state;
    }
}

业务提交一个任务给线程池的时候,线程池会返回一个UserWork类,业务侧通过UserWork类和线程池通信。

4 管理子线程的数据结构

代码语言:javascript
复制
// 管理子线程的数据结构
class Thread {
    constructor({ worker }) {
        // nodejs的Worker对象,nodejs的worker_threads模块的Worker
        this.worker = worker;
        // 线程状态
        this.state = THREAD_STATE.IDLE;
        // 上次工作的时间
        this.lastWorkTime = Date.now();
    }
    // 修改线程状态
    setState(state) {
        this.state = state;
    }
    // 修改线程最后工作时间
    setLastWorkTime(time) {
        this.lastWorkTime = time;
    }
}

线程池中维护了多个子线程,Thread类用于管理子线程的信息。

5 线程池 线程池的实现是核心,我们分为几个部分讲。

5.1 支持的配置

代码语言:javascript
复制
constructor(options = {}) {
        this.options = options;
        // 子线程队列
        this.workerQueue = [];
        // 核心线程数
        this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;
        // 线程池最大线程数,如果不支持动态扩容则最大线程数等于核心线程数
        this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;
        // 超过任务队列长度时的处理策略
        this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;
        // 是否预创建子线程
        this.preCreate = options.preCreate === true;
        // 线程最大空闲时间,达到后自动退出
        this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;
        // 是否预创建线程池
        this.preCreate && this.preCreateThreads();
        // 保存线程池中任务对应的UserWork
        this.workPool = {};
        // 线程池中当前可用的任务id,每次有新任务时自增1
        this.workId = 0;
        // 线程池中的任务队列
        this.queue = [];
        // 线程池总任务数
        this.totalWork = 0;
        // 支持的最大任务数
        this.maxWork = ~~options.maxWork || config.MAX_WORK;
        // 处理任务的超时时间,全局配置
        this.timeout = ~~options.timeout;
        this.pollIdle();
    }

上面的代码列出了线程池所支持的能力。

5.2 创建线程

代码语言:javascript
复制
newThread() {
        const worker = new Worker(workerPath);
        const thread = new Thread({worker});
        this.workerQueue.push(thread);
        const threadId = worker.threadId;
        worker.on('exit', () => {
            // 找到该线程对应的数据结构,然后删除该线程的数据结构
            const position = this.workerQueue.findIndex(({worker}) => {
                return worker.threadId === threadId;
            });
            const exitedThread = this.workerQueue.splice(position, 1);
            // 退出时状态是BUSY说明还在处理任务(非正常退出)
            this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0;
        });
        // 和子线程通信
        worker.on('message', (result) => {
            const {
                work,
                event,
            } = result;
            const { data, error, workId } = work;
            // 通过workId拿到对应的userWork
            const userWork = this.workPool[workId];
            // 不存在说明任务被取消了
            if (!userWork) {
                return;
            }
            // 修改线程池数据结构
            this.endWork(userWork);

            // 修改线程数据结构
            thread.setLastWorkTime(Date.now());

            // 还有任务则通知子线程处理,否则修改子线程状态为空闲
            if (this.queue.length) {
                // 从任务队列拿到一个任务交给子线程
                this.submitWorkToThread(thread, this.queue.shift());
            } else {
                thread.setState(THREAD_STATE.IDLE);
            }

            switch(event) {
                case 'done':
                    // 通知用户,任务完成
                    userWork.emit('done', data);
                    break;
                case 'error':
                    // 通知用户,任务出错
                    if (EventEmitter.listenerCount(userWork, 'error')) {
                        userWork.emit('error', error);
                    }
                    break;
                default: break;
            }
        });
        worker.on('error', (...rest) => {
            console.error(...rest);
        });
        return thread;
    }

创建线程,并保持线程对应的数据结构、退出、通信管理、任务分派。子线程执行完任务后,会通知线程池,主线程通知用户。

5.3 选择线程

代码语言:javascript
复制
 selectThead() {
        // 找出空闲的线程,把任务交给他
        for (let i = 0; i < this.workerQueue.length; i++) {
            if (this.workerQueue[i].state === THREAD_STATE.IDLE) {
                return this.workerQueue[i];
            }
        }
        // 没有空闲的则随机选择一个
        return this.workerQueue[~~(Math.random() * this.workerQueue.length)];
    }

当用户给线程池提交一个任务时,线程池会选择一个空闲的线程处理该任务。如果没有可用线程则任务插入待处理队列等待处理。

5.4 提交任务

代码语言:javascript
复制
// 给线程池提交一个任务
    submit(filename, options = {}) {
        return new Promise(async (resolve, reject) => {
            let thread;
            // 没有线程则创建一个
            if (this.workerQueue.length) {
                thread = this.selectThead();
                // 该线程还有任务需要处理
                if (thread.state === THREAD_STATE.BUSY) {
                    // 子线程个数还没有达到核心线程数,则新建线程处理
                    if (this.workerQueue.length < this.coreThreads) {
                        thread = this.newThread();
                    } else if (this.totalWork + 1 > this.maxWork){
                        // 总任务数已达到阈值,还没有达到线程数阈值,则创建
                        if(this.workerQueue.length < this.maxThreads) {
                            thread = this.newThread();
                        } else {
                            // 处理溢出的任务
                            switch(this.discardPolicy) {
                                case DISCARD_POLICY.ABORT: 
                                    return reject(new Error('queue overflow'));
                                case DISCARD_POLICY.CALLER_RUN:
                                    const workId = this.generateWorkId();
                                    const userWork =  new UserWork({workId}); 
                                    userWork.setState(WORK_STATE.RUNNING);
                                    userWork.terminate = () => {
                                        userWork.setState(WORK_STATE.CANCELED);
                                    };
                                    this.timeout && userWork.setTimeout(this.timeout);
                                    resolve(userWork);
                                    try {
                                        let aFunction;
                                        if (isJSFile(filename)) {
                                            aFunction = require(filename);
                                        } else {
                                            aFunction = vm.runInThisContext(`(${filename})`);
                                        }
                                        if (!isFunction(aFunction)) {
                                            throw new Error('work type error: js file or string');
                                        }
                                        const result = await aFunction(options);
                                        // 延迟通知,让用户有机会取消或者注册事件
                                        setImmediate(() => {
                                            if (userWork.state !== WORK_STATE.CANCELED) {
                                                userWork.setState(WORK_STATE.END);
                                                userWork.emit('done', result);
                                            }
                                        });
                                    } catch (error) {
                                        setImmediate(() => {
                                            if (userWork.state !== WORK_STATE.CANCELED) {
                                                userWork.setState(WORK_STATE.END);
                                                userWork.emit('error', error.toString());
                                            }
                                        });
                                    }
                                    return;
                                case DISCARD_POLICY.OLDEST_DISCARD: 
                                    const work = this.queue.shift();
                                    // maxWork为1时,work会为空
                                    if (work && this.workPool[work.workId]) {
                                        this.cancelWork(this.workPool[work.workId]);
                                    } else {
                                        return reject(new Error('no work can be discarded'));
                                    }
                                    break;
                                case DISCARD_POLICY.DISCARD:
                                    return reject(new Error('discard'));
                                case DISCARD_POLICY.NOT_DISCARD:
                                    break;
                                default: 
                                    break;
                            }
                        }
                    }
                }
            } else {
                thread = this.newThread();
            }
            // 生成一个任务id
            const workId = this.generateWorkId();

            // 新建一个UserWork
            const userWork =  new UserWork({workId}); 
            this.timeout && userWork.setTimeout(this.timeout);

            // 新建一个work
            const work = new Work({ workId, filename, options });

            // 修改线程池数据结构,把UserWork和Work关联起来
            this.addWork(userWork);

            // 选中的线程正在处理任务,则先缓存到任务队列
            if (thread.state === THREAD_STATE.BUSY) {
                this.queue.push(work);
                userWork.terminate = () => {
                    this.cancelWork(userWork);
                    this.queue = this.queue.filter((node) => {
                        return node.workId !== work.workId;
                    });
                }
            } else {
                this.submitWorkToThread(thread, work);
            }

            resolve(userWork);
        })
    }

    submitWorkToThread(thread, work) {
        const userWork = this.workPool[work.workId];
        userWork.setState(WORK_STATE.RUNNING);
        // 否则交给线程处理,并修改状态和记录该线程当前处理的任务id
        thread.setState(THREAD_STATE.BUSY);
        thread.worker.postMessage(work);
        userWork.terminate = () => {
            this.cancelWork(userWork);
            thread.setState(THREAD_STATE.DEAD);
            thread.worker.terminate();
        }
    }

    addWork(userWork) {
        userWork.setState(WORK_STATE.PENDDING);
        this.workPool[userWork.workId] = userWork;
        this.totalWork++;
    }

    endWork(userWork) {
        delete this.workPool[userWork.workId];
        this.totalWork--;
        userWork.setState(WORK_STATE.END);
        userWork.clearTimeout(); 
    }

    cancelWork(userWork) {
        delete this.workPool[userWork.workId];
        this.totalWork--;
        userWork.setState(WORK_STATE.CANCELED);
        userWork.emit('cancel');
    }

提交任务是线程池暴露给用户侧的接口,主要处理的逻辑包括,根据当前的策略判断是否需要新建线程、选择线程处理任务、排队任务等,如果任务数达到阈值,则根据丢弃策略处理该任务。

5.5 空闲处理

代码语言:javascript
复制
 pollIdle() {
        setTimeout(() => {
            for (let i = 0; i < this.workerQueue.length; i++) {
                const node = this.workerQueue[i];
                if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) {
                    node.worker.terminate();
                }
            }
            this.pollIdle();
        }, 1000);
    }

当子线程空闲时间达到阈值后,主线程会杀死子线程,避免浪费系统资源。总结,这就是线程池具体的设计和实现,另外创建线程失败会导致主线程挂掉,所以使用线程的时候,最后新开一个子进程来管理该线程池。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档