前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redis 6多线程模型

redis 6多线程模型

作者头像
用户4700054
发布2022-08-17 11:45:38
4200
发布2022-08-17 11:45:38
举报
文章被收录于专栏:存储内核技术交流

redis 多线程架构

  • redis6之前的版本一直单线程方式解析命令、处理命令,这样的模式实现起来简单,但是无法使用多核CPU的优势,无法达到性能的极致;到了redis 6,redis6采用多线程模式来来读取和解析命令,但是命令的执行依然通过队列由主线程串行执行,多线程的好处是分离了命令的解析和命令执行,命令的解析有独立的IO线程进行,命令执行依旧有main线程执行,多线程增加了代码的复杂度

开启多线程模型

Redis.conf添加如下配置

代码语言:javascript
复制
io-threads 4
io-threads-do-reads yes

代码语言:javascript
复制
struct redisServer {
  pthread_t main_thread_id;         /* Main thread id */
  int io_threads_num;         /* Number of IO threads to use. */
  int io_threads_do_reads;    /* Read and parse from IO threads? */
  int io_threads_active;      /* Is IO threads currently active? */
}
  • 在redis-server中的该配置表现为三个字段

启动redis并查看多线程

  • redis-server thread:从队列中取出数据一次执行命令
  • bio_aof_fsync thread :page cache中的aof数据fsync到磁盘的线程
  • io_thd thread: 从tcp中读取命令同时解析命令
  • 多线程主逻辑
代码语言:javascript
复制
int main(int argc, char **argv) {
    // 加载配置文件
    loadServerConfig(configfile,options);
    //主线程逻辑初始化,启动aeCreateFileEvent/beforeSleep.
    initServer() {
    	void beforeSleep(struct aeEventLoop *eventLoop) {
	 		handleClientsWithPendingReadsUsingThreads() {
	 			readQueryFromClient->processInputBuffer->processCommandAndResetClient->processCommand->call
	 		}
		}
    }
    //多线程模型初始化
    InitServerLast();
    aeMain(server.el);
}


void InitServerLast() {
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

void initThreadedIO(void) {
     for (int i = 0; i < server.io_threads_num; i++) {
         pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i);
     }
}
void *IOThreadMain(void *myid) {
	while(1) {
		 listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
        	//这里解析命令同时放到client放到 server.clients_pending_read队列中
        	readQueryFromClient(c->conn);
        }
	}
}
代码语言:javascript
复制
// conn->type->xxx 对应的函数
ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
};
代码语言:javascript
复制
int processCommand(client *c) {
	
	call(c,CMD_CALL_FULL);
}
int processCommandAndResetClient(client *c) {
	processCommand(c)
}
int handleClientsWithPendingReadsUsingThreads(void) {
	while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        processCommandAndResetClient(c) == C_ERR)
        processInputBuffer(c);
    }
}
void beforeSleep(struct aeEventLoop *eventLoop) {
   handleClientsWithPendingReadsUsingThreads();
}
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

void readQueryFromClient(connection *conn) {
	 /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
}
//一个客户端计入流程
client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    connSetReadHandler(conn, readQueryFromClient);
        //初始化client其他的信息
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
   createClient(conn);
}
/* Register a read handler, to be called when the connection is readable.
 * If NULL, the existing handler is removed.
 */
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
    //conn->conn_handler = clientAcceptHandler
    if (!callHandler(conn, conn->conn_handler)) return;
 /* Handle normal I/O flows */
    if (!invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
    /* Fire the writable event. */
    if (call_write) {
        if (!callHandler(conn, conn->write_handler)) return;
    }
    /* If we have to invert the call, fire the readable event now
     * after the writable one. */
    if (invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-08-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 存储内核技术交流 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • redis 多线程架构
    • 开启多线程模型
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档