Memcached的网络模型

Memcached依赖于libevent,网络模型是典型的reactor模式,主线程通过自己的event_base绑定端口监听网络中的连接。每个worker线程的初始任务就是轮询管道上的notify_receive_fd的读事件,如果有连接,主线程往相应的worker线程的管道的输入端notify_send_fd写入关键字‘c’,代表着网络上有新的连接要派发给worker线程,这样worker进程直接accept就能得到连接的fd了,同时把这个fd的读事件放到每个worker进程的event_base中,如图,每个worker的进程同时监听notify_receive_fd和外部连接的fd上的事件。每个worker都应该为连接上的fd分配一个conn结构体,这个结构体记录了这个连接全部的信息,如连接的fd,读写buf,操作的item,当前connection的状态等等。

主线程的任务就是监听网络,如果监听到了连接就顺序分发到worker线程,通过管道通知worker线程建立连接(调用accept函数,这样调用accept肯定能得到连接的fd)。

建立连接的代码:

  1. 453 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
  2. //sfd就是监听连接的fd,event_handler就是有事件的回调函数,c是当前的connection,作为一个attachment ,在调用回调函数时,就能通过attchement获得当前有事件的connection。
  3. 454 event_base_set(base, &c->event); //初始化这个事件
  4. 455 c->ev_flags = event_flags;
  5. 456
  6. 457 if (event_add(&c->event, 0) == -1) { //把这个事件加到event-base事件的监控中
  7. 458 if (conn_add_to_freelist(c)) {
  8. 459 conn_free(c);
  9. 460 }
  10. 461 perror("event_add");
  11. 462 return NULL;
  12. 463 }

如果有连接,那么event_base会回调函数event_handler,还有监听新连接的sfd初始状态都是conn_listening,如果回调了函数event_hander,那么证明一定就是有连接请求的。

  1. 3753 case conn_listening: //监听的fd的connection,初始状态就是这个,表示监听中,有连接
  2. 3754 addrlen = sizeof(addr);
  3. 3755 if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
  4. //回调到了这个地方,肯定有连接,这个函数直接得到连接的sfd
  5. 3770 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
  6. 3771 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { //把这个套结字变为非阻塞操作
  7. 3772 perror("setting O_NONBLOCK");
  8. 3773 close(sfd);
  9. 3774 break;
  10. 3775 }
  11. 3785 } else { //这里把新的请求分配给相应的worker线程
  12. 3786 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
  13. 3787 DATA_BUFFER_SIZE, tcp_transport);
  14. 3788 }

接到了请求了之后,会顺序分配给每个worker线程,把这个连接的sfd所关心的初始状态和事件和readbuf,作为一个entry放到worker进程的连接队列中,同时向那个worker的线程的管道的notify_ receive_fd写入‘c’,提示worker进程有连接,每个worker线程有个连接队列,做缓冲,为了防止worker新连接产生的过快,worker线程处理速度过慢。

  1. 448 CQ_ITEM *item = cqi_new(); //建立连接的item
  2. 449 char buf[1];
  3. 450 int tid = (last_thread + 1) % settings.num_threads; // 每次分配worker线程,加1
  4. 451
  5. 452 LIBEVENT_THREAD *thread = threads + tid;
  6. 453
  7. 454 last_thread = tid; //可以看出每次分配worker都是顺序分配的
  8. 455
  9. 456 item->sfd = sfd;
  10. 457 item->init_state = init_state;
  11. 458 item->event_flags = event_flags;
  12. 459 item->read_buffer_size = read_buffer_size;
  13. 460 item->transport = transport;
  14. 461
  15. 462 cq_push(thread->new_conn_queue, item); //插入到这个worker进程的连接队列中
  16. 463
  17. 464 MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
  18. 465 buf[0] = 'c';
  19. 466 if (write(thread->notify_send_fd, buf, 1) != 1) {
  20. //往这个worker的管道的一端写入数据,提示有连接了
  21. 467 perror("Writing to thread notify pipe");
  22. 468 }

worker线程拿到了这个连接之后,就应该是分配给这个连接一个结构体,包括这个连接所有的状态,都写buf等,这个结构体就是conn,然后这个worker线程会在它自己的event_base加入对这个新的连接的事件的监听。上面也说过了worker的event_base有两套处理逻辑,一个对notify_ receive_fd的,还有一套是对新连接的。这个notify_ receive_fd的处理逻辑就是处理2个事件,一个是建立连接,一个是改变锁的粒度。连接有个状态机:

1.listening:这个状态是主线程的connection默认状态,它只有这一个状态,它做的工作就是把接到连接分发到worker子线程。

2.conn_new_cmd:每个新连接的初始状态,这个状态会清空读写buf。

3.conn_waiting:这个状态就是在event_base中设置读事件,然后状态机暂停,挂起当前connection(函数退出,回调函数的attachment会记录这个connection),等待有新的信息过来,然后通过回调函数的attachment重新找到这个connection,然后启动状态机。

4.conn_read:该状态从sfd中读取客户端的指令信息。

5.conn_parse_cmd:判断具体的指令,如果是update的指令,那么需要跳转到conn_nread中,因为需要在从网络中读取固定byte的数据,如果是查询之类的指令,就直接查询完成后,跳转到conn_mwrite中,返回数据

6.conn_nread:从网络中读取指定大小的数据,这个数据就是更新到item的数据,然后将数据更新到hash和lru中去,然后跳转到conn_write

7.conn_write:这个状态主要是调用out_string函数会跳转到这个状态,一般都是提示信息和返回的状态信息,然后输出这些数据,然后根据write_to_go的状态,继续跳转

8.conn_mwrite:这个写是把connection中的msglist返回到客户端,这个msglist存的是item的数据,用于那种get等获得item的操作的返回数据。

9.conn_swallow:对于那种update操作,如果分配item失败,显然后面的nread,是无效的,客户端是不知道的,这样客户端继续发送特定的数量的数据,就需要把读到的这些数据忽略掉,然后如果把后面指定的数据都忽略掉了(set的两部提交,数据部分忽略掉),那么connection跳转到conn_new_cmd,如果读nread的那些特定数量的数据没有读到,直接跳转到conn_closing。

10.conn_closing:服务器端主动关闭连接,调用close函数关闭文件描述符,同时把conn结构体放到空闲队列中,供新的连接重用这写conn结构体。

总结:memcached的网络模块的事件模型依赖于libevent的实现,memcached把fd关心的事件注册给libevent并注册了回调函数,libevent负责回调memcached,主线程把连接dispatch到具体的worker线程,同时把这个连接的描述符注册给worker线程自己的一套libevent,这样worker就接收了这个连接,以后这个fd上的事件回调都是这个线程上的做的工作了,每个连接都有自己的一套状态机,如果接受到数据就通过状态机处理,状态扭转,如果暂时没有数据就把连接暂时挂起,等到有了数据继续执行状态机。

本文分享自微信公众号 - Golang语言社区(Golangweb)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2016-08-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏王磊的博客

Ubuntu下安装配置JDK1.7

1、下载JDK 对于下载方法,可以使用命令,也可以手动下载。本人采用手动下载jdk的方式。 下载jdk-7u7-linux-i586.tar.gz到Ubuntu...

32740
来自专栏linux系统运维

rsync通过服务同步,linux系统日志,screen工具

23340
来自专栏noteless

springmvc 项目完整示例03 小结

一般一个项目,主要有domain,dao,service,controller这几个层次,具体的真不清楚的话可以百度一下

45820
来自专栏挖坑填坑

Asp.net+Vue2构建简单记账WebApp之三(使用Vue-cli构建vue.js应用)

16930
来自专栏smy

windows下操作linux虚拟机映射网络驱动器中文件提示chmod权限不足解决方案

为了方便操作,linux虚拟机会通过windows下连接网络驱动器的方式共享自己的文件,对于前端来说,我想把gulp放在windows磁盘,操作虚拟机中的php...

23130
来自专栏耕耘实录

在CentOS7下同时安装、使用Python2.x和Python3.x

一般情况下,各类Linux操作系统是会默认安装Python2.x的,在最新的CentOS7.4版本中,发现系统默认只安装了Python2.x,而且是很多系统组件...

12120
来自专栏linux系统运维

磁盘格式化,磁盘挂载以及手动增加swap

25650
来自专栏编程坑太多

一个简单的Django项目

16740
来自专栏散尽浮华

python案例-用户登录

要求: •输入用户名密码 •认证成功后显示欢迎信息 •输错三次后锁定 1 #!/usr/bin/env python 2 # -*- coding:utf-...

37270
来自专栏决胜机器学习

PHP网络技术(六)——session及与cookie的比较

PHP网络技术(六) ——session及与cookie的比较 (原创内容,转载请注明来源,谢谢) 一、概念 session是持续的、双向性的...

40870

扫码关注云+社区

领取腾讯云代金券