前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Mysql连接管理从network_init()到connection_event_loop()

Mysql连接管理从network_init()到connection_event_loop()

作者头像
mingjie
发布2022-05-12 10:39:17
3250
发布2022-05-12 10:39:17
举报
文章被收录于专栏:Postgresql源码分析

1 network setup简要流程

代码语言:javascript
复制
[mysql.cc]network_init
  > Mysqld_socket_listener *mysqld_socket_listener=new (std::nothrow) Mysqld_socket_listener(bind_addr_str,...)
  > Connection_acceptor<Mysqld_socket_listener> *mysqld_socket_acceptor = new (std::nothrow) Connection_acceptor<Mysqld_socket_listener>(mysqld_socket_listener)
  > mysqld_socket_acceptor->init_connection_acceptor()
  |
  [connection_acceptor.h]init_connection_acceptor()
    > m_listener->setup_listener();
    |
    [socket_connection.cc]Mysqld_socket_listener::setup_listener()
      > setup_listener都做了什么? 跳转到“3 class Mysqld_socket_listene”

2 class Connection_acceptor

这里面涉及两个重要数据结构,Connection_acceptor和Mysqld_socket_listener。

  • Connection_acceptor模版类需要listener
  • Mysqld_socket_listener作为listener传入Connection_acceptor

Connection_acceptor较为简单,需要传入的listener有setup_listener()方法即可。

代码语言:javascript
复制
template <typename Listener> class Connection_acceptor
{
  Listener *m_listener;

public:
  Connection_acceptor(Listener *listener)
  : m_listener(listener)
  { }

  ~Connection_acceptor()
  {
    delete m_listener;
  }

  /**
    Initialize a connection acceptor.

    @retval   return true if initialization failed, else false.
  */
  bool init_connection_acceptor()
  {
    return m_listener->setup_listener();
  }

  /**
    Connection acceptor loop to accept connections from clients.
  */
  void connection_event_loop()
  {
    Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
    while (!abort_loop)
    {
      Channel_info *channel_info= m_listener->listen_for_connection_event();
      if (channel_info != NULL)
        mgr->process_new_connection(channel_info);
    }
  }

  /**
    Close the listener.
  */
  void close_listener()
  {
    m_listener->close_listener();
  }

};

3 class Mysqld_socket_listener

Mysqld_socket_listener对连接进行具体的初始化操作

代码语言:javascript
复制
class Mysqld_socket_listener
{
  std::string m_bind_addr_str; // IP address string
  uint m_tcp_port; // TCP port to bind to
  uint m_backlog; // backlog specifying length of pending connection queue
  uint m_port_timeout; // port timeout value
  std::string m_unix_sockname; // unix socket pathname to bind to
  bool m_unlink_sockname; // Unlink socket & lock file if true.
  /*
    Map indexed by MYSQL socket fds and correspoding bool to distinguish
    between unix and tcp socket.
  */
  socket_map_t m_socket_map; // map indexed by mysql socket fd and index

  uint m_error_count; // Internal variable for maintaining error count.

#ifdef HAVE_POLL
  static const int MAX_SOCKETS=2;
  struct poll_info_t
  {
    struct pollfd m_fds[MAX_SOCKETS];
    MYSQL_SOCKET m_pfs_fds[MAX_SOCKETS];
  };
  // poll related info. used in poll for listening to connection events.
  poll_info_t m_poll_info;
#else
  struct select_info_t
  {
    fd_set  m_read_fds,m_client_fds;
    my_socket m_max_used_connection;
    select_info_t() : m_max_used_connection(0)
    { FD_ZERO(&m_client_fds); }
  };
  // select info for used in select for listening to connection events.
  select_info_t m_select_info;
#endif // HAVE_POLL

...

public:
  static ulong get_connection_errors_select()
  {
    return connection_errors_select;
  }

  static ulong get_connection_errors_accept()
  {
    return connection_errors_accept;
  }

  static ulong get_connection_errors_tcpwrap()
  {
    return connection_errors_tcpwrap;
  }

  /**
    Constructor to setup a listener for listen to connect events from
    clients.

    @param   bind_addr_str  IP address used in bind
    @param   tcp_port       TCP port to bind to
    @param   backlog        backlog specifying length of pending
                            connection queue used in listen.
    @param   port_timeout   portname.
    @param   unix_sockname  pathname for unix socket to bind to
  */
  Mysqld_socket_listener(std::string bind_addr_str, uint tcp_port,
                         uint backlog, uint port_timeout,
                         std::string unix_sockname);

  /**
    Set up a listener - set of sockets to listen for connection events
    from clients.

    @retval false  listener sockets setup to be used to listen for connect events
            true   failure in setting up the listener.
  */
  bool setup_listener();

  /**
    The body of the event loop that listen for connection events from clients.

    @retval Channel_info   Channel_info object abstracting the connected client
                           details for processing this connection.
  */
  Channel_info* listen_for_connection_event();

  /**
    Close the listener.
  */
  void close_listener();

  ~Mysqld_socket_listener()
  {
    if (!m_socket_map.empty())
      close_listener();
  }
};

3.1 Mysqld_socket_listener::setup_listener

比较重要的就是这里了,实现具体的socket初始化流程:

[socket_connection.cc]bool Mysqld_socket_listener::setup_listener()

第一步,如果有配置端口监听,拿到第一个网络套接字

代码语言:javascript
复制
  if (m_tcp_port)
  {
    TCP_socket tcp_socket(m_bind_addr_str, m_tcp_port,
                          m_backlog, m_port_timeout);

    MYSQL_SOCKET mysql_socket= tcp_socket.get_listener_socket();
    if (mysql_socket.fd == INVALID_SOCKET)
      return true;

    m_socket_map.insert(std::pair<MYSQL_SOCKET,bool>(mysql_socket, false));
  }

第二步,如果有域套接字,拿到第二个域套接字

代码语言:javascript
复制
  if (m_unix_sockname != "")
  {
    Unix_socket unix_socket(&m_unix_sockname, m_backlog);

    MYSQL_SOCKET mysql_socket= unix_socket.get_listener_socket();
    if (mysql_socket.fd == INVALID_SOCKET)
      return true;

    m_socket_map.insert(std::pair<MYSQL_SOCKET,bool>(mysql_socket, true));
    m_unlink_sockname= true;
  }

现在socket_map里面有两个socket:

在这里插入图片描述
在这里插入图片描述

第三步,使用poll监听相关的socket

代码语言:javascript
复制
  for (socket_map_iterator_t sock_map_iter=  m_socket_map.begin();
       sock_map_iter != m_socket_map.end(); ++sock_map_iter)
  {
    MYSQL_SOCKET listen_socket= sock_map_iter->first;
    mysql_socket_set_thread_owner(listen_socket);
    m_poll_info.m_fds[count].fd= mysql_socket_getfd(listen_socket);
    m_poll_info.m_fds[count].events= POLLIN;
    m_poll_info.m_pfs_fds[count]= listen_socket;
    count++;
  }

装入两个socket后:

在这里插入图片描述
在这里插入图片描述

4 进入监听循环

进入监听循环的还是Mysqld_socket_listener类的方法:listen_for_connection_event

代码语言:javascript
复制
mysqld_main
  |
  [mysql.cc]network_init
  ...
  [mysql.cc]mysqld_socket_acceptor->connection_event_loop();
    > Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
    > while (!abort_loop)
      > [socket_connection.cc]Channel_info* Mysqld_socket_listener::listen_for_connection_event()
        > 等待连接:poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);
      > 如果有连接进来:mgr->process_new_connection(channel_info);
        > check_and_incr_conn_count()
        > m_connection_handler->add_connection(channel_info)
          > Per_thread_connection_handler::add_connection()
            > 这就到另外一篇的内容了《Mysql连接建立与thread cache唤醒原理
              > https://blog.csdn.net/jackgo73/article/details/113118822

5 poll支持拓展

主线程在监听socket时使用的poll似乎只传入了第一个socket?看下代码:

代码语言:javascript
复制
poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);

m_poll_info.m_fds默认会有两个socket,第一个是3306端口绑定的网络套接字,第二个是给定路径的域套接字。poll全部都会监听的:

int poll ( struct pollfd *fdarray , unsigned long nfds , int timeout );

参数:

  • fdarray :指向数组的指针,这个数组是由 struct pollfd 作为元素构成的
  • nfds :与 select 相同,对应的是所监控的最大文件描述符的个数,使用的时候传入当前最大的文件描述符号+1 即可
  • timeout : 在这里可以用来设定 poll 的工作模式 : 阻塞模式, 非阻塞模式, 介于阻塞非阻塞之间的在限定时间内阻塞模式
    • INFTIME(-1) 永远等待
    • 0 立即返回,不阻塞进程
    • >0 等待指定数目的毫秒数

返回值:

  • 0 : 没有任何套接字上没有变化
  • n(n>0) : 有 n 个套接字上面有变化(来可读数据,有了可写的数据)
  • -1 : poll 执行过程中出错
在这里插入图片描述
在这里插入图片描述

6 总结

  1. mysqld初始化,主进程先network_init() (1)创建connection_accepter,传入mysqld_socket_listener (2)执行mysqld_socket_listener的setup操作,对网络和域套接字分别初始化,按poll能识别的格式记录到m_poll_info数组中
  2. mysqld继续初始化,主进程先connection_event_loop() (3)开始监听,执行mysqld_socket_listener的eventloop动作,poll传入上面构造好的结构体m_poll_info开始监听。 (4)监听到了之后,用mgr->process_new_connection(channel_info);处理 (5)进入add_connection()里面优先使用thread cache处理连接,发信号唤醒线程继续处理。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-02-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 network setup简要流程
  • 2 class Connection_acceptor
  • 3 class Mysqld_socket_listener
    • 3.1 Mysqld_socket_listener::setup_listener
    • 4 进入监听循环
    • 5 poll支持拓展
    • 6 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档