前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >200行代码解读TDEngine背后的定时器

200行代码解读TDEngine背后的定时器

作者头像
AI科技大本营
发布2019-08-09 19:51:09
8790
发布2019-08-09 19:51:09
举报

作者 | beyondma

来源 | CSDN博客

导读:最近几周,本文作者几篇有关陶建辉老师最新的创业项目-TdEngine代码解读文章出人意料地引起了巨大的反响,原以为C语言已经是昨日黄花,不过从读者的留言来看,C语言还是老当益壮,依旧有着巨大的影响力,作者就以此为契机不断向陶老师请教,这次再给大家带来TdEngine计时器的代码解读。

其主要源码地址如下:https://github.com/taosdata/TDengine/blob/master/src/util/src/ttimer.c

TDEngine为何要自己实现Timer

其实一开始在读到这段代码时笔者也有类似的疑问,因为操作系统的内核基本都实现了定时器的功能,可以直接调用,但是深入思考一下就会发现由于TdEginge本身是个时序数据库的应用,而由于数据库的特殊性,其对库底层的需求其实与操作系统的内核需求类似,我们知道直接调用操作系统的timer需要在到时后启动一个对应的线程去处理对应的中断请求,而这对于TdEginge这种动辙需要上万个定时器的数据库应用来说无疑是一笔巨大的开销,这显然不是陶老师这种极端要求效率的程序员能够接受的。

所以TDEngine的定时器的基本思路是基于操作系统的timer,来封装自身的定时器功能,使所有的timer控制器运行在一个线程池,而在同一timer控制器下的timer则运行在同一线程内以此来达到节约资源的目的。

TDEngine Timer的基本工作原理

简单来说TDEngine Timer的工作流程如下:

一.Timer初始化

主要完成以下工作:

  1. 初始化Timer线程池
  2. 启动操作系统的Timer,并注册处理timer循环处理函数taosTimerLoopFunc。

二.Timer启动

  1. 计算当前需要启动timer的调起序列号(index),即taosTimerLoopFunc运行在第几个Loop时会调起当前的timer的处理函数。
  2. 对于调起序列号相同的timer加入双链表tmrList[当前index],其在链表中的位置依据其到期时间的先后排序,注:(由于之前启动的操作系统timer也是有循环周期的,所以TDEngine timer也可能不是要在当前周期内调起。所以调起序列号相同,但是调起周期运可能有前后次序,其在链表中的位置其实是要买调起周期的先后排序的。)

工作过程图示:由于作者美工基础较差,所以虽然花了很久但是下图的效果其实一般,请大家能够直观感受即可。

简要地讲,taosTimerLoopFunc函数循环处理tmrList,并将当前启动序列号对应的tmrList交由taosTmrProcessList处理,taosTmrProcessList调用处在当前启动序列号(index)且处在当前循环序列号(cycle)的timer的回调(handler)函数。

结合代码的解读

  1. 初始化函数的解读,具体代码及注释如下:
代码语言:javascript
复制
void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) {  static pthread_once_t tmrInit = PTHREAD_ONCE_INIT;  tmr_ctrl_t *          pCtrl;   pthread_once(&tmrInit, taosTmrModuleInit);//pthread_once保证TmrModule只会被运行一次   int tmrCtrlId = taosAllocateId(tmrIdPool);   if (tmrCtrlId < 0) {    tmrError("%s bug!!! too many timers!!!", label);    return NULL;  }   pCtrl = tmrCtrl + tmrCtrlId;  tfree(pCtrl->tmrList);  tmrMemPoolCleanUp(pCtrl->poolHandle);   memset(pCtrl, 0, sizeof(tmr_ctrl_t));   pCtrl->tmrCtrlId = tmrCtrlId;  strcpy(pCtrl->label, label);  pCtrl->maxNumOfTmrs = maxNumOfTmrs;   if ((pCtrl->poolHandle = tmrMemPoolInit(maxNumOfTmrs + 10, sizeof(tmr_obj_t))) == NULL) {    tmrError("%s failed to allocate mem pool", label);    tmrMemPoolCleanUp(pCtrl->poolHandle);    return NULL;  }   if (resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK;//初始化分辨率  pCtrl->resolution = resolution;  pCtrl->maxTicks = resolution / MSECONDS_PER_TICK;//初始化最大的tick  pCtrl->ticks = rand() / pCtrl->maxTicks;  pCtrl->numOfPeriods = longest / resolution;//初始化最大周期  if (pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10;   pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods);//初始化tmrList目前还都是NULL  for (int i = 0; i < pCtrl->numOfPeriods; i++) {    pCtrl->tmrList[i].head = NULL;    pCtrl->tmrList[i].count = 0;  }   if (pthread_mutex_init(&pCtrl->mutex, NULL) < 0) {    tmrError("%s failed to create the mutex, reason:%s", label, strerror(errno));    taosTmrCleanUp(pCtrl);    return NULL;  }   pCtrl->signature = pCtrl;  numOfTmrCtrl++;  tmrTrace("%s timer ctrl is initialized, index:%d", label, tmrCtrlId);  return pCtrl;//初始化完成}

2. 模块初化函数:我们看到在初始化函数中调用了模块初始化函数进行线程池及操作系统定时器的启动处理,其具体代码及注释如下:

代码语言:javascript
复制
void taosTmrModuleInit(void) {  tmrIdPool = taosInitIdPool(maxNumOfTmrCtrl);//初始化id池  memset(tmrCtrl, 0, sizeof(tmrCtrl)); #ifdef LINUX  pthread_t      thread;  pthread_attr_t tattr;  pthread_attr_init(&tattr);  pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);  if (pthread_create(&thread, &tattr, taosProcessAlarmSignal, NULL) != 0) {    tmrError("failed to create timer thread");//初始化操作系统timer    return;  }   pthread_attr_destroy(&tattr);#else  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);#endif   tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");//初始化handle的Scheduler  tmrTrace("timer module is initialized, thread:%d", taosTmrThreads);}

3.timer启动函数

代码语言:javascript
复制
tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle) {  tmr_obj_t * pObj, *cNode, *pNode;  tmr_list_t *pList;  int         index, period;  tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;   if (handle == NULL) return NULL;   period = mseconds / pCtrl->resolution;//初始化period数  if (pthread_mutex_lock(&pCtrl->mutex) != 0)    tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));   pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle);  if (pObj == NULL) {    tmrError("%s reach max number of timers:%d", pCtrl->label, pCtrl->maxNumOfTmrs);    pthread_mutex_unlock(&pCtrl->mutex);    return NULL;  }   pObj->cycle = period / pCtrl->numOfPeriods;//初始化周期数,即使用period除以每个循环中共有几个period,得到需要在第几个循环中调起timer  pObj->param1 = param1;  pObj->fp = fp;  pObj->timerId = pObj;  pObj->pCtrl = pCtrl;   index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods;//初始化启动序列号  int cindex = (pCtrl->periodsFromStart) % pCtrl->numOfPeriods;  pList = &(pCtrl->tmrList[index]);   pObj->index = index;  cNode = pList->head;  pNode = NULL;   while (cNode != NULL) {    if (cNode->cycle < pObj->cycle) {      pNode = cNode;      cNode = cNode->next;    } else {      break;    }  }   pObj->next = cNode;  pObj->prev = pNode;   if (cNode != NULL) {    cNode->prev = pObj;  }   if (pNode != NULL) {    pNode->next = pObj;  } else {    pList->head = pObj;  }   pList->count++;  pCtrl->numOfTmrs++;  //以上为将相同index的timer按照先后顺序在pList中排序,具体下面会有图例展示。  if (pthread_mutex_unlock(&pCtrl->mutex) != 0)    tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno));   tmrTrace("%s %p, timer started, fp:%p, tmr_h:%p, index:%d, total:%d cindex:%d", pCtrl->label, param1, fp, pObj, index,           pCtrl->numOfTmrs, cindex);   return (tmr_h)pObj;}

可能各位读者也被以上代码中的pobj,cnode,pnode搞的晕头转向,下面我们以3个timer为例,假如他们的index和cycle都相同,那么他们分别调用完taosTmrStart之后tmrList会是什么情况,可以参考下表。

4.loopFunc和taosTmrProcessList

具体代码及注释如下:

代码语言:javascript
复制
void taosTmrProcessList(tmr_ctrl_t *pCtrl) {  unsigned int index;  tmr_list_t * pList;  tmr_obj_t *  pObj, *header;   pthread_mutex_lock(&pCtrl->mutex);  index = pCtrl->periodsFromStart % pCtrl->numOfPeriods;//计算当前index  pList = &pCtrl->tmrList[index];   while (1) {    header = pList->head;    if (header == NULL) break;     if (header->cycle > 0) {/*如当前index对应的tmrList[index]的cycle大于0则下面会将其cycle减1*/      pObj = header;      while (pObj) {        pObj->cycle--;        pObj = pObj->next;      }      break;    }     pCtrl->numOfTmrs--;    tmrTrace("%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d", pCtrl->label, header->param1, header->fp,             header, index, pCtrl->numOfTmrs);     pList->head = header->next;/*如运行到此处则当前header已经expired,重新整理pList*/    if (header->next) header->next->prev = NULL;    pList->count--;    header->timerId = NULL;     SSchedMsg schedMsg;    schedMsg.fp = NULL;    schedMsg.tfp = header->fp;    schedMsg.ahandle = header->param1;    schedMsg.thandle = header;    taosScheduleTask(tmrQhandle, &schedMsg);     tmrMemPoolFree(pCtrl->poolHandle, (char *)header);  }   pCtrl->periodsFromStart++;  pthread_mutex_unlock(&pCtrl->mutex);}
代码语言:javascript
复制
void *taosTimerLoopFunc(int signo) {  tmr_ctrl_t *pCtrl;  int         count = 0;   for (int i = 1; i < maxNumOfTmrCtrl; ++i) {    pCtrl = tmrCtrl + i;    if (pCtrl->signature) {      count++;      pCtrl->ticks++;      if (pCtrl->ticks >= pCtrl->maxTicks) {/*如当前的ticks已经大于maxTicks则需要调起taosTmrProcessList对当前index的timer进行处理*/        taosTmrProcessList(pCtrl);        pCtrl->ticks = 0;      }      if (count >= numOfTmrCtrl) break;    }  }   return NULL;}

最后我也想留一个开放性问题,也就是tmrList使用双链表实现的最大好处是什么,能否使用单链表实现?欢迎读者留言说出你的看法。

原文链接:

https://blog.csdn.net/BEYONDMA/article/details/98473143

(*本文为 AI科技大本营转载文章,转载请联系原作者)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AI科技大本营 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档