专栏首页前端杂货铺nodejs中的并发编程

nodejs中的并发编程

从sleep的实现说起

在nodejs中,如果要实现sleep的功能主要是通过“setTimeout + promise”实现,也可以通过“循环空转”来解决。前者是利用定时器实现任务的延迟执行,并通过promise链管理任务间的时序与依赖,本质上nodejs的执行线程并没有真正的sleep,事件循环以及v8仍在运行,是仅仅表现在业务逻辑上sleep;而后者的实现则无疑实在浪费CPU性能,有点类似自旋锁,不符合大多数场景。

若要实现引擎层面(运行时)的sleep,事情在ECMAScript Latest Draft (ECMA-262)出现之后开始有了转机。ECMA262规定了 Atomics.wait,它会将调用该方法的代理(引擎)陷入等待队列并让其sleep,直到被notify或者超时。该规范在8.10.0以上版本的nodejs上被实现。

事实上,Atomics.wait 的出现主要解决浏览器或nodejs的worker之间数据同步的问题。浏览器上的web-worker、正式被nodejs@12纳入的worker-threads模块,这些都是ECMAScript多线程模型的具体实现。既然出现多线程那么线程间的同步也就不可避免的被提到,在前端以及nodejs范围内可以使用Atomics.wait和notify来解决。

说的有些跑题,回到本节,如何实现运行时的sleep呢?很简单,利用Atomics.wait的等待超时机制:

let sharedBuf = new SharedArrayBuffer(4);
let sharedArr = new Int32Array(sharedBuf);
// 睡眠n秒
let sleep = function(n){
    Atomics.wait(sharedArr, 0, 0, n * 1000);
}

此处的sleep并不是异步方法,它会阻塞执行线程直到超时,因此需要根据业务场景来使用该sleep模型。 关于Atomics.wait的具体使用方法,下文会着重讲解。

多线程同步

虽然nodejs多线程使用场景不是很多,但是一旦涉及到多线程,那么线程间同步就必不可少,否则无法解决临界区的问题。不过nodejs的work_threads对线程的创建不同于c或者java,它使用libuv的API创建线程 “uv_thread_create”,但是在此之前需要初始化一些设施如MessagePort、v8实例设置等,因此创建一个thread并不是一个轻量级的操作,需要结合场景酌情创建适量的threads。

回到正题,多线程间的同步一般需要依赖锁,而锁的实现需要依赖于全局变量。在nodejs的work_threads实现中,主线程无法设置全局变量,因此可以通过Atomics实现。正如上例中所示,Atomics.wait依赖 SharedArrayBuffer,这是共享内存的ArrayBuffer,threads之间可通过它共享数据,可真正操作ArrayBuffer时并不直接使用该对象,而是TypeArray。如Atomics.wait,第一个参数必须是Int32Array对象,而该对象指向的缓冲区为SharedArrayBuffer。当线程A因为Atomics.wait而阻塞后,可通过其它线程B调用Atomics.notify进行唤醒从而让线程A的v8继续执行。

let { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
var sab = new SharedArrayBuffer(1024);
var int32 = new Int32Array(sab);
if (isMainThread) {
    const  worker  =  new Worker(__filename, {
        workerData: sab
    });
    worker.on('message', (d) => {
        console.log('parent receive message:', d);
    });
    worker.on('error', (e) => {
        console.error('parent receive error', e);
    });
    worker.on('exit', (code) => {
        if (code !==  0)
            console.error(new  Error(`工作线程使用退出码 ${code} 停止`));
    });

    Atomics.wait(int32, 0, 0); // A
    console.log(int32[0]); // C: 123
} else {
    let buf = workerData;
    let arrs = new Int32Array(buf);
    Atomics.store(arrs, 0, 123); 
    Atomics.notify(arrs, 0); // B
}

上例中,主线程创建thread后,在A处进行阻塞;在新线程中,通过原子操作Atomics.store修改SharedArrayBuffer的第一项为123后,于B处唤醒阻塞在SharedArrayBuffer第一项的其它线程;此时主线程被唤醒,执行console.log(int32[0]),输出被新线程修改后的SharedArrayBuffer第一项数据123。

分析一个公平、排它、不可重入锁的实现,它使用Atomics.wait/notify/compareExchange完成线程的同步。

main-thread.js

let  Lock  =  require('./lock').Lock;
let { Worker } =  require('worker_threads');
const  sharedBuffer  =  new SharedArrayBuffer(1 * Int32Array.BYTES_PER_ELEMENT);
const  sharedArray  =  new  Int32Array(sharedBuffer);
let worker = new Worker('./worker-lock.js', {
    workerData:  sharedBuffer
});
Lock.initialize(sharedArray, 0);
const  lock  =  new  Lock(sharedArray, 0);
// 获取锁
lock.lock(); 

// 3s后释放锁
setTimeout(() => {
    lock.unlock(); // (B)
}, 3000)
worker-thread.js

let  Lock  =  require('./lock').Lock;
let { parentPort, workerData } =  require('worker_threads');
const  sharedArray  =  new  Int32Array(workerData);
const  lock  =  new  Lock(sharedArray, 0);

console.log('Waiting for lock...'); // (A)
// 获取锁
lock.lock(); // (B) blocks!
console.log('Unlocked'); // (C)

主线程初始化互斥锁,同时创建线程,主线程获取锁后三秒钟释放; worker线程尝试获取锁,此时锁已被主线程获取,因此worker线程在此阻塞,等待3s后主线程释放锁被唤醒,继续执行输出。

lock.js

const  UNLOCKED  =  0;
const  LOCKED_NO_WAITERS  =  1;
const  LOCKED_POSSIBLE_WAITERS  =  2;
const  NUMINTS  =  1;

class  Lock {
    // 'iab' must be a Int32Array mapping shared memory.
    // 'ibase' must be a valid index in iab, the first of NUMINTS reserved for the lock.
    constructor(iab, ibase) {
        if (!(iab  instanceof  Int32Array  &&  ibase|0  ===  ibase  &&  ibase  >=  0  &&  ibase+NUMINTS  <=  iab.length)) {
            throw  new  Error(`Bad arguments to Lock constructor: ${iab}  ${ibase}`);
        }
        this.iab  =  iab;
        this.ibase  =  ibase;
    }
    static  initialize(iab, ibase) {
        if (!(iab  instanceof  Int32Array  &&  ibase|0  ===  ibase  &&  ibase  >=  0  &&  ibase+NUMINTS  <=  iab.length)) {
            throw  new  Error(`Bad arguments to Lock constructor: ${iab}  ${ibase}`);
        }
        Atomics.store(iab, ibase, UNLOCKED);
        return  ibase;
    }
    // Acquire the lock, or block until we can. Locking is not recursive:
    lock() {
        const  iab  =  this.iab;
        const  stateIdx  =  this.ibase;
        var  c;
        if ((c  =  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS)) !==  UNLOCKED) { // A
            do {
                if (c  ===  LOCKED_POSSIBLE_WAITERS
                ||  Atomics.compareExchange(iab, stateIdx, LOCKED_NO_WAITERS, LOCKED_POSSIBLE_WAITERS) !==  UNLOCKED) {
                    Atomics.wait(iab, stateIdx, LOCKED_POSSIBLE_WAITERS, Number.POSITIVE_INFINITY);
                }
            } while ((c  =  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_POSSIBLE_WAITERS)) !==  UNLOCKED); // B
        }
    }
    tryLock() {
        const  iab  =  this.iab;
        const  stateIdx  =  this.ibase;
        return  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS) ===  UNLOCKED;
    }
    unlock() {
        const  iab  =  this.iab;
        const  stateIdx  =  this.ibase;
        var  v0  =  Atomics.sub(iab, stateIdx, 1);
        // Wake up a waiter if there are any
        if (v0  !==  LOCKED_NO_WAITERS) {
            Atomics.store(iab, stateIdx, UNLOCKED);
            Atomics.notify(iab, stateIdx, 1);
        }
    }
    toString() {
        return  "Lock:{ibase:"  +  this.ibase  +"}";
    }
}
exports.Lock  =  Lock;

当进程A尝试获取锁成功时,A处判断语句为false,因此由compareExchange设置状态为LOCKED_NO_WAITERS,直接执行其后续逻辑; 若进程B此时执行lock获取锁时,A处判断为true,进入do while循环体,在wait处sleep; 进程A通过unlock释放锁,会将锁状态置为UNLOCKED,同时唤醒阻塞的进程B; 进程B执行循环判断语句B,此时为false,跳出循环执行B的逻辑。

当然,也可通过tryLock实现自旋锁或者其他逻辑实现非阻塞等待。

参考

libuv漫谈之线程 Atomics Atomics MDN

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • jQuery的事件模型

    前几天自己着重读了jQuery1.11.1的源码,又结合了之前对DE事件模型的分析,最后也实现一个简陋的事件模型。 jQuery的事件系统离不开jQuery的缓...

    欲休
  • Nodejs“实现”Dubbo Provider

    目前nodejs应用越来越广泛,但和java的dubbo体系接入困难,所以我们需要实现node端的dubbo provider逻辑。java的dubbo pro...

    欲休
  • DOMContentLoaded实现

    IE系列直到IE9才支持DOMContentLoaded事件,对于IE8及其之前版本,如果html内没有框架,则可以采用document.documentELe...

    欲休
  • 商标与版权的区别

    近期看到有很多小伙伴一直分不清楚商标和版权的区别,不懂这两者之间的异同。今天墨者安全就给小伙伴们详细的讲解一下商标与版权之间的区别。

    墨者安全科技
  • 数字时钟系统在医院的解决方案

    在需要多处显示时间的地方,如办公楼各室、学校、教室、火车站候车室等都希望显示统一的时间,利用数字时钟系统可以实现这一目的。数字时钟系统分母钟和子钟两部分,母钟通...

    时频专家
  • TensorTrade:基于深度强化学习的Python交易框架

    互联网上有很多关于强化学习交易系统零零碎碎的东西,但是没有一个是可靠和完整的。出于这个原因,我们决定创建一个开源的Python框架,使用深度强化学习,有效地将任...

    量化投资与机器学习微信公众号
  • WordPress获取首页网站链接和站点名称

    AlexTao
  • 当Activity跳转偶遇单身多年的老汉

    内容来源:作者——Android轮子哥,链接——https://www.jianshu.com/p/579f1f118161,好文请多支持!感谢您的阅读~

    IT大咖说
  • 群面有没有胜率100%的技巧?

    https://www.zhihu.com/question/32025213/answer/805821528

    编程珠玑
  • 面试造火箭,入职拧螺丝?(大神可绕行)

    相信小伙伴们曾经都有过这样的经历:面试时被面试官的各种高深问题(例如奇葩异常的解决方案、脑洞大开的逻辑算法、各种框架的底层原理,以及大型项目的架构方案与是否拥有...

    用户1272076

扫码关注云+社区

领取腾讯云代金券