一般人理解 Node 是单线程的,所以 Node 启动后线程数应该为 1,我们做实验看一下。
setInterval(() => {
console.log(new Date().getTime())
}, 3000)
可以看到 Node 进程占用了 7 个线程。
为什么会有 7 个线程呢?
我们都知道,Node 中最核心的是 v8 引擎,在 Node 启动后,会创建 v8 的实例,这个实例是多线程的
还是上面那个例子,我们在定时器执行的同时,去读一个文件:
const fs = require('fs')
setInterval(() => {
console.log(new Date().getTime())
}, 3000)
fs.readFile('./index.html', () => {})
线程数量变成了 11 个,
这是因为在 Node 中有一些 IO 操作(DNS,FS)和一些 CPU 密集计算(Zlib,Crypto)会启用 Node 的线程池,
而线程池默认大小为 4,因为线程数变成了 11。
我们可以手动更改线程池默认大小:
process.env.UV_THREADPOOL_SIZE = 64
一行代码轻松把线程变成 71 ?
Nodejs一直以单线程异步IO著称,擅长IO密集型操作,不擅长CPU密集型操作。
每日一问11-线程使用场景(Node.js,Redis ,Memcached)
但是,新版的Nodejs,在不断弥补这方面的短板。
在 Node 10.5.0,官方给出了一个实验性质的模块 worker_threads 给 Node 提供了真正的多线程能力
在 Node.js 12.11.0,worker_threads 模块正式进入稳定版
至此,Nodejs算是了真正的多线程能力。
进程是资源分配的最小单位,线程是CPU调度的最小单位。
Nodejs多线程种类
Node.js 中有三类线程 (child_process 和 cluster 的实现均为进程)
1. event loop的主线程
2. libuv的异步I/O线程池
3. worker_threads的线程 (上面图片没有这个)
属性 | 多进程 | 多线程 | 比较 |
---|---|---|---|
数据 | 数据共享复杂,需要用IPC;数据是分开的,同步简单 | 因为共享进程数据,数据共享简单,同步复杂 | 各有千秋 |
CPU、内存 | 占用内存多,切换复杂,CPU利用率低 | 占用内存少,切换简单,CPU利用率高 | 多线程更好 |
销毁、切换 | 创建销毁、切换复杂,速度慢 | 创建销毁、切换简单,速度很快 | 多线程更好 |
coding | 编码简单、调试方便 | 编码、调试复杂 | 多进程更好 |
可靠性 | 进程独立运行,不会相互影响 | 线程同呼吸共命运 | 多进程更好 |
分布式 | 可用于多机多核分布式,易于扩展 | 只能用于多核分布式 | 多进程更好 |
总结 node:通过work线程,io线程 提高垂直扩展能力
栗子B
《Scalable IO in Java》 是java.util.concurrent包的作者,大师Doug Lea关于分析与构建可伸缩的高性能IO服务的一篇经典文章,在文章中Doug Lea通过各个角度,循序渐进的梳理了服务开发中的相关问题,以及在解决问题的过程中服务模型的演变与进化,
文章中基于Reactor反应器模式的几种服务模型架构,也被Netty、Mina等大多数高性能IO服务框架所采用,因此阅读这篇文章有助于你更深入了解Netty、Mina等服务框架的编程思想与设计模式。
,原文连接:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
在一般的网络或分布式服务等应用程序中,大都具备一些相同的处理流程,例如:
① 读取请求数据;
② 对请求数据进行解码;
③ 对数据进行处理;
④ 对回复数据进行编码;
⑤ 发送回复;
当然在实际应用中每一步的运行效率都是不同的,
例如其中可能涉及到xml解析、文件传输、web页面的加载、计算服务等不同功能。
在构建高性能可伸缩IO服务的过程中,我们希望达到以下的目标:
① 能够在海量负载连接情况下优雅降级;
② 能够随着硬件资源的增加,性能持续改进;
③ 具备低延迟、高吞吐量、可调节的服务质量等特点;
而分治处理就是实现上述目标的一个最佳方式。
分发模式具有以下几个机制:
① 将一个完整处理过程分解为一个个细小的的任务;
② 每个任务执行相关的动作且不产生阻塞;
③ 在任务执行状态被触发时才会去执行,例如只在有数据时才会触发读操作;
在一般的服务开发当中,IO事件通常被当做任务执行状态的触发器使用,在hander处理过程中主要针对的也就是IO事件;
三、Reactor模式
Reactor也可以称作反应器模式,它有以下几个特点:
① Reactor模式中会通过分配适当的handler(处理程序)来响应IO事件,类似与AWT 事件处理线程;
② 每个handler执行非阻塞的操作,类似于AWT ActionListeners 事件监听
③ 通过将handler绑定到事件进行管理,类似与AWT addActionListener 添加事件监听;
在多处理器场景下,为实现服务的高性能我们可以有目的的采用多线程模式:
1、增加Worker线程,专门用于处理非IO操作,
因为通过上面的程序我们可以看到,反应器线程需要迅速触发处理流程,而如果处理过程也就是process()方法产生阻塞会拖慢反应器线程的性能,
所以我们需要把一些非IO操作交给Woker线程来做;
2、拆分并增加反应器Reactor线程,
一方面在压力较大时可以饱和处理IO操作,提高处理能力;
另一方面维持多个Reactor线程也可以做负载均衡使用;
线程的数量可以根据程序本身是CPU密集型还是IO密集型操作来进行合理的分配;
Reactor多线程设计模式具备以下几个特点:
① 通过卸载非IO操作来提升Reactor 线程的处理性能
② 比将非IO操作重新设计为事件驱动的方式更简单;
③ 但是很难与IO重叠处理,最好能在第一时间将所有输入读入缓冲区;
④ 可以通过线程池的方式对线程进行调优与控制,一般情况下需要的线程数量比客户端数量少很多;
下面是Reactor多线程设计模式的一个示意图与示例代码(我们可以看到在这种模式中在Reactor线程的基础上把非IO操作放在了Worker线程中执行)
总结: 1个acceptr 线程,多个epoll线程,多个work线程
1. event loop的主线程
2. libuv的异步I/O线程池
3. worker_threads的线程
这里刚刚开始,reids用过吗?是单线程还是多线程?
符合一般网络请求特点:
# So for instance if you have a four cores boxes, try to use 2 or 3 I/O
# threads, i
# if you have a 8 cores, try to use 6 threads. In order to
# enable I/O threads use the following configuration directive:
#
# io-threads 4
redis conf注释里为什么推荐io thread为4个?
通过上面的8个线程已经测试得出,在超过4个io线程后性能不升反降的
开启8个io thread线程的效果。8个线程把8个cpu core负载跑的差不多了,
但qps却不升反降。
http://xiaorui.cc/archives/6918
redis增加的io线程
//每个线程负责什么工作:
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
while(1) {
/* This is the list of clients each thread will serve when threaded I/O is
* used. We spawn io_threads_num-1 threads, since one is the main thread
* itself. */
list *io_threads_list[IO_THREADS_MAX_NUM]; //客户端请求列表
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
}
}
readQueryFromClient
writeToClient
//client 分配规则
int handleClientsWithPendingWritesUsingThreads(void)
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* This is the list of clients each thread will serve when threaded I/O is
* used. We spawn io_threads_num-1 threads, since one is the main thread
* itself. */
list *io_threads_list[IO_THREADS_MAX_NUM];
list *listAddNodeTail(list *list, void *value)
{
listNode *node;
if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
node->value = value;
// //双向链表的插入,这里锁呢 为什么??不是多线程吗?
if (list->len == 0) {
list->head = list->tail = node;
node->prev = node->next = NULL;
} else {
//双向链表的插入,这里锁呢 为什么??不是多线程吗?
node->prev = list->tail;
node->next = NULL;
list->tail->next = node;
list->tail = node;//移动位置
}
list->len++;
return list;
}
主线程如何获取io处理完毕:用锁了吗
//原子操作:统计是否处理完毕
static inline unsigned long getIOPendingCount(int i) {
unsigned long count = 0;
atomicGetWithSync(io_threads_pending[i], count);
return count;
}
handleClientsWithPendingReadsUsingThreads:
//那么主线程又如何得知io线程干完了?也是轮询
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
多线程 必然用锁,锁降低并发 该怎么办
方法:
1 单线程:
2 使用原子操作
3 cas实现 乐观锁。(分段锁只是一个部分)
Memcached::cas()执行一个“检查并设置”的操作,
因此,它仅在当前客户端最后一次取值后,
该key 对应的值没有被其他客户端修改的情况下,
才能够将值写入。
/*
* Returns an item if it hasn't been marked as expired,
* lazy-expiring as needed.
*/
item *item_get(const char *key, const size_t nkey) {
item *it;
uint32_t hv;
hv = hash(key, nkey, 0);
item_lock(hv); //加锁
it = do_item_get(key, nkey, hv);
item_unlock(hv); //解锁
return it;
}
//对hv进行加锁。
void item_lock(uint32_t hv) {
uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);
} else {
mutex_lock(&item_global_lock);
} //位运算符&如何实现取模功能
}
如何分析问题,
这里采用:读取 数据,处理业务,发送数据 拆分每个步骤方法。
分析 乐观锁,可扩展io模型实现。