Memcached的扩容源码分析

Hash表是Memcached里面最重要的结构之一,其采用链接法来处理Hash冲突,当Hash表中的项太多时,也就是Hash冲突比较高的时候,Hash表的遍历就脱变成单链表,此时为了提供Hash的性能,Hash表需要扩容,Memcached的扩容条件是当表中元素个数超过Hash容量的1.5倍时就进行扩容,扩容过程由独立的线程来完成,扩容过程中会采用2个Hash表,将老表中的数据通过Hash算法映射到新表中,每次移动的桶的数目可以配置,默认是每次移动老表中的1个桶。

//hash表中增加元素  
int assoc_insert(item *it, const uint32_t hv) {  
    unsigned int oldbucket;  
    //如果已经进行扩容且目前进行扩容还没到需要插入元素的桶,则将元素添加到旧桶中  
    if (expanding &&(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)  
    {  
        it->h_next = old_hashtable[oldbucket];//添加元素  
        old_hashtable[oldbucket] = it;  
    } else {//如果没扩容,或者扩容已经到了新的桶中,则添加元素到新表中  
        it->h_next = primary_hashtable[hv & hashmask(hashpower)];//添加元素  
        primary_hashtable[hv & hashmask(hashpower)] = it;  
    }  
  
    hash_items++;//元素数目+1  
    //还没开始扩容,且表中元素个数已经超过Hash表容量的1.5倍  
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {  
        assoc_start_expand();//唤醒扩容线程  
    }  
  
    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);  
    return 1;  
}  
//唤醒扩容线程  
static void assoc_start_expand(void) {  
    if (started_expanding)  
        return;  
    started_expanding = true;  
    pthread_cond_signal(&maintenance_cond);//唤醒信号量  
}  
//启动扩容线程,扩容线程在main函数中会启动,启动运行一遍之后会阻塞在条件变量maintenance_cond上面,插入元素超过规定,唤醒条件变量  
static void *assoc_maintenance_thread(void *arg) {  
    //do_run_maintenance_thread的值为1,即该线程持续运行  
    while (do_run_maintenance_thread) {  
        int ii = 0;  
  
        item_lock_global();//加Hash表的全局锁  
        mutex_lock(&cache_lock);//加cache_lock锁  
        //执行扩容时,每次按hash_bulk_move个桶来扩容  
        for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {  
            item *it, *next;  
            int bucket;  
            //老表每次移动一个桶中的一个元素  
            for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {  
                next = it->h_next;//要移动的下一个元素  
  
                bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);//按新的Hash规则进行定位  
                it->h_next = primary_hashtable[bucket];//挂载到新的Hash表中  
                primary_hashtable[bucket] = it;  
            }  
  
            old_hashtable[expand_bucket] = NULL;//旧表中的这个Hash桶已经按新规则完成了扩容  
  
            expand_bucket++;//老表中的桶计数+1  
            if (expand_bucket == hashsize(hashpower - 1)) {//hash表扩容结束,expand_bucket从0开始,一直递增  
                expanding = false;//修改扩容标志  
                free(old_hashtable);//释放老的表结构  
                STATS_LOCK();//更新一些统计信息  
                stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);  
                stats.hash_is_expanding = 0;  
                STATS_UNLOCK();  
                if (settings.verbose > 1)  
                    fprintf(stderr, "Hash table expansion done\n");  
            }  
        }  
  
        mutex_unlock(&cache_lock);//释放cache_lock锁  
        item_unlock_global();//释放Hash表的全局锁  
  
        if (!expanding) {//完成扩容  
            //修改Hash表的锁类型,此时锁类型更新为分段锁,默认是分段锁,在进行扩容时,改为全局锁  
            switch_item_lock_type(ITEM_LOCK_GRANULAR);  
            slabs_rebalancer_resume();//释放用于扩容的锁  
            /* We are done expanding.. just wait for next invocation */  
            mutex_lock(&cache_lock);//加cache_lock锁,保护条件变量  
            started_expanding = false;//修改扩容标识  
            pthread_cond_wait(&maintenance_cond, &cache_lock);//阻塞扩容线程  
            mutex_unlock(&cache_lock);  
            slabs_rebalancer_pause();//加用于扩容的锁  
            switch_item_lock_type(ITEM_LOCK_GLOBAL);//修改锁类型为全局锁  
            mutex_lock(&cache_lock);//临时用来实现临界区  
            assoc_expand();//执行扩容  
            mutex_unlock(&cache_lock);  
        }  
    }  
    return NULL;  
}  
//按2倍容量扩容Hash表  
static void assoc_expand(void) {  
    old_hashtable = primary_hashtable;//old_hashtable指向主Hash表  
  
    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));//申请新的空间  
    if (primary_hashtable) {//空间申请成功  
        if (settings.verbose > 1)  
            fprintf(stderr, "Hash table expansion starting\n");  
        hashpower++;//hash等级+1  
        expanding = true;//扩容标识打开  
        expand_bucket = 0;  
        STATS_LOCK();//更新全局统计信息  
        stats.hash_power_level = hashpower;  
        stats.hash_bytes += hashsize(hashpower) * sizeof(void *);  
        stats.hash_is_expanding = 1;  
        STATS_UNLOCK();  
    } else {//空间事情失败  
        primary_hashtable = old_hashtable;  
    }  
}  

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏云霄雨霁

并发容器类

1103
来自专栏Java编程技术

使用zookeeper实现分布式锁

在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源...

912
来自专栏Hongten

hibernate中的java对象有几种状态,其相互关系如何(区别和相互转换)

花了一些时间理解hibernate中的java对象的几种状态,很容易就懂了,这里记录一下,分享给大家!!

723
来自专栏osc同步分享

hibernate 中对象的状态

load() 和 get() User user = session.get(User.class, "1"); 如上调用get方法后,会向数据库查询id为1的...

2765
来自专栏https://www.cnblogs.com/L

Hive篇---Hive使用优化

本节主要描述Hive的优化使用,Hive的优化着重强调一个 把Hive SQL 当做Mapreduce程序去优化 二.主要优化点

1081
来自专栏AILearning

Apache Spark 2.2.0 中文文档 - Spark 编程指南 | ApacheCN

Spark 编程指南 概述 Spark 依赖 初始化 Spark 使用 Shell 弹性分布式数据集 (RDDs) 并行集合 外部 Data...

2266
来自专栏Spark生态圈

Spark整合HBase(自定义HBase DataSource)

Spark支持多种数据源,但是Spark对HBase 的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的DataSou...

1452
来自专栏我是攻城师

Storm组件介绍

2665
来自专栏我的技术专栏

《Go in action》读后记录:Go的并发与并行

793
来自专栏MYSQL轻松学

Percona XtraDB Cluster

image.png 1、什么是Percona XtraDB Cluster Percona XtraDB Cluster是一个开源,免费的MySQL高可用工具....

29710

扫码关注云+社区