这位创造GitHub冠军项目的“老男人”,堪称10倍程序员本尊

作者 | 马超,CSDN博客专家,金融科技从业者 来源 | CSDN博客

7月12日一款叫做TDengine的时序数据库项目在GitHub上开源了,这个项目一经发布就稳稳占据了GitHub排行榜的C位,目前TdEngine已经累积了5000多个star,并且连续一周排在上升榜首位。而且你要知道TdEngine的开发语言并不是火热的Python或JAVA,而是C语言。C语言无巧可取,虽见功夫,但是代码比较难读,能引发如此的关注绝对堪称奇迹,在我印象中即使是Mysql也没有达到如此的热度。

相信很多人也和笔者一样,是通过《比hadoop快至少10 倍的物联网大数据平台,我把它开源》的刷屏文才了解到陶老师与TdEngine的,当看到这位50岁的IT老兵老兵,依旧奋斗在编程一线,为TDengine开发贡献3万行代码时候,我就立刻四处向朋友打听,并最终要了陶老师的微信,做为一名80后程序员,我近不急待的想和陶老师直接沟通,想从他身上找到保持编程水平的秘决。

大神面对面-这才是10倍程序员该有的样子

2008年的时候笔者还是CSDN论坛WINDOWS MOBILE版的版主,从事手机导航软件的开发工作,而在彼时陶老师也创办了和信公司,并亲自开发了WindowsMobile版和信客户端,相同的开发平台经历也让我们迅速的拉进了彼此的距离。

在使用TdEngine的过程中我发现了两个小问题,一是数据库用户密码明文存放,二是数据文件权限设置不合理。让我十分震惊的是,这两个问题是我下午在和陶老师聊天时提出的,当晚发布版本就把问题全部解决了。后来沟通得知这些BUG都是陶老师自己动手修改的。我意识到TdEngine的效率应该来自于创始人对于代码的执着与热爱,而不是对员工996式的工作要求

陶老师是真的爱编程,尤其对于代码运行效率有着近乎狂热的追求,我查阅了陶老师近年来的作品,其和信客户端只有18K大小,胎心算法的实现只用了600行代码,而TDengine这样一个数据库项目竟然只需要1.5M安装包就能搞定,在手机APP都动辙上百M的今天,TDengine体量甚至显得有些异类。如果没有深厚的功底和坚定的信念是绝对无法达到如此高度的。我想陶老师应该就是传说中10倍程序员的典范吧。

10倍程序员对于他周围亲友的影响也是非常巨大的,当我打开TdEngine的官网(https://www.taosdata.com/cn/),其简洁明快的风格,一目了然的配图,实在让我无法把这一切和一位年近半百的老派IT士人联系到一起,当然后来我和陶老师聊到这件事的时候才知道,整个网站从设计、前端、后台、浏览器适配、数据分析到搜索引擎优化,都是由陶老师的儿子,一位刚刚高中毕业的00后操刀主持的,而且整个网站从无到有只用了三周时间,除了感叹一句后生可畏,由此也可以看出来和10倍程序员并肩作战的也都是10倍程序员,所以it团队的负责人在感叹自己没有18程序员相助时也要反思一下,自己是不是一位10程序员。

TdEngine为什么会火

传统数据库厂商的问题在于傲慢、自大,他们认为数据是零件,数据库则是各类零件的加中心,很多工序都是为数据的修改准备的,无论修改是否发生加工车间为了保证一致性,都会对流水线上的数据加上各种各样的锁。这些操作浪费了很多时间,而且几乎没有任何轻量级的框架,可供用户选择省略掉这些冗余操作。而且传统厂商为了解决数据库的性能问题不是从底层架构逻辑下手,而是不休止的在应用与数据库之间加入各种像REDIS,NGIX等等代理或者缓存层,这种方式其实是加大了各层级间的性能开销。传统厂商认为自己非常了解数据,但却忘了用户比厂商更加了解自己的数据,天下可谓苦秦久已

而TdEngine是认为数据是信息流,它要做的非常简单,只是数据的录像机而已,信息调阅只要找到对应的录像带即可,这样的设计思路从底层逻辑上决定了td会是一款性能极高的产品。它更加贴合物联网时代的数据模型,而且代码只有10万行的量级,非常适合从从头开始学习。

所以TdEngine精确的找到了数据库市场的细分战场。他可以在相同的硬件条件下达到其它产品10倍的速度,完美解决了很多物联网,量化交易等场景的痛点。

TdEngine代码导读

当笔者打TdEngine的代码时不由眼前一亮,其代码风格及规范性绝对堪称一流,于是我打开了久违的souce insight,,再一次开始了阅读C语言代码的美妙旅程,在这里强烈推荐各位读者也来读一下,绝对堪称享受。

这里将给我启示最大的一段代码其链接在

https://github.com/taosdata/TDengine/blob/master/src/util/src/tsched.c

向大家分享一下。鉴于本文肯定会分享给陶老师,所以估计会有作者亲答的环节:-),以下代码是一个典型的consumer-producer消息传递功能的实现,也就是有多个生产者(producer)生成并不断向队列中传递消息,也有多个消费者(consumer)不断从队列中取消息,而在java等高级语言中类似的功能已经被封装好了,这其实也让程序员无法了解线程间的同步和互斥机制。在正式进入到代码之前我想请大家思考这样的一个,互斥体( mutex)和信号量(semaphore)的使用是如何做到多线程安全的。

先来看结构体设计,具体我已经注释好了:

typedef struct {
  char            label[16];#消息内容
  sem_t           emptySem;#此信号量代表队列的可写状态
  sem_t           fullSem;#此信号量代表队列的可读状态
  pthread_mutex_t queueMutex;#此互斥体为保证消息不会被误修改,保证线程程安全
  int             fullSlot;#队尾位置
  int             emptySlot;#队头位置
  int             queueSize;#队列长度
  int             numOfThreads;#同时操作的线程数量
  pthread_t *     qthread;#线程指针
  SSchedMsg *     queue;#队列指针
} SSchedQueue;

再来看初始化函数,这里需要特别说明的是,两个信号量的创建,其中emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度,fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读。具体代码及我的注释如下:

void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {
  pthread_attr_t attr;
  SSchedQueue *  pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));

  memset(pSched, 0, sizeof(SSchedQueue));
  pSched->queueSize = queueSize;
  pSched->numOfThreads = numOfThreads;
  strcpy(pSched->label, label);

  if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
    pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
   #emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度。
  if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
    pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
 #fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读
  if (sem_init(&pSched->fullSem, 0, 0) != 0) {
    pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }

  if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
    pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }

  memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
  pSched->fullSlot = 0;#实始化时队列为空,故队头和队尾的位置都是0
  pSched->emptySlot = 0;#实始化时队列为空,故队头和队尾的位置都是0

  pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  for (int i = 0; i < pSched->numOfThreads; ++i) {
    if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
      pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
      goto _error;
    }
  }

  pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);

  return (void *)pSched;

_error:
  taosCleanUpScheduler(pSched);
  return NULL;
}

再来看读消息的taosProcessSchedQueue函数,这个主要逻辑是

1.使用无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续向下处理

2.在操作msg前,加入互斥体防止msg被误用。

3.读操作完毕后修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。同时退出互斥体。

4.对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6

具体代码及注释如下:

void *taosProcessSchedQueue(void *param) {
  SSchedMsg    msg;
  SSchedQueue *pSched = (SSchedQueue *)param;
 #注意这里是个无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续处理
  while (1) {
    if (sem_wait(&pSched->fullSem) != 0) {
      pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
      if (errno == EINTR) {
        /* sem_wait is interrupted by interrupt, ignore and continue */
        continue;
      }
    }
     #加入互斥体防止msg被误用。
    if (pthread_mutex_lock(&pSched->queueMutex) != 0)
      pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));

    msg = pSched->queue[pSched->fullSlot];
    memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
    #读取完毕修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。
    pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
     #读取完毕修改退出互斥体
    if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
      pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno));
     #读取完毕对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6
    if (sem_post(&pSched->emptySem) != 0)
      pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno));

    if (msg.fp)
      (*(msg.fp))(&msg);
    else if (msg.tfp)
      (*(msg.tfp))(msg.ahandle, msg.thandle);
  }
}

最后来看写消息的taosScheduleTask函数,其基本逻辑是

1.写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。

2.加入互斥体防止msg被误操作,写入完成后退出互斥体。

3.写队列完成后对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取taosProcessSchedQueue中 sem_wait(&pSched->fullSem)不再阻塞就继续向下。

int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
  SSchedQueue *pSched = (SSchedQueue *)qhandle;
  if (pSched == NULL) {
    pError("sched is not ready, msg:%p is dropped", pMsg);
    return 0;
  }
  #在写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。
  if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
#加入互斥体防止msg被误操作
  if (pthread_mutex_lock(&pSched->queueMutex) != 0)
    pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));

  pSched->queue[pSched->emptySlot] = *pMsg;
  pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;

  if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
    pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
  #在写队列前先对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取函数可以进行处理。
  if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));

  return 0;
}

当然以上只是TdEngine优美代码的一小部分,而且笔者解读的功力也十分有限,这里再次强烈建议大家下载全部源码仔细学习,定能受益匪浅。

原文:

https://blog.csdn.net/BEYONDMA/article/details/96578186

(*本文仅代表作者观点,转载请联系原作者)

原文发布于微信公众号 - AI科技大本营(rgznai100)

原文发表时间:2019-07-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券