前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解redis的一个del和unlink的命令的执行过程-2

深入理解redis的一个del和unlink的命令的执行过程-2

作者头像
公众号-利志分享
发布2022-04-25 09:26:11
7550
发布2022-04-25 09:26:11
举报
文章被收录于专栏:利志分享

继续上一篇文章继续讲,上次我们讲了del涉及到的同步删除的整个逻辑,del删除会通过参数走到dbSyncDelete方法,然而unlink则会走dbAsyncDelete方法。这里我们直接从dbAsyncDelete这个方法开始讲。

dbAsyncDelete方法的执行是在redis/src/lazyfree.c里面。

代码语言:javascript
复制
/* Delete a key, value, and associated expiration entry if any, from the DB.
 * If there are enough allocations to free the value object may be put into
 * a lazy free list instead of being freed synchronously. The lazy free list
 * will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    /* 从db->expires中删除key,只是删除其指针而已,并没有删除实际值 */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    /* If the value is composed of a few allocations, to free in a lazy way
     * is actually just slower... So under a certain limit we just free
     * the object synchronously. */
    /*
    * 在字典中摘除这个key(没有真正删除,只是查不到而已),如果被摘除的dictEntry不为
    * 空就去执行下面的释放逻辑 
    */
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        /* Tells the module that the key has been unlinked from the database. */
        moduleNotifyKeyUnlink(key,val);

        /* lazy_free并不是完全异步的,而是先评估释放操作所需工作量,如果影响较小就直接在主线程中删除了 */
        size_t free_effort = lazyfreeGetFreeEffort(key,val);


        /* If releasing the object is too much work, do it in the background
         * by adding the object to the lazy free list.
         * Note that if the object is shared, to reclaim it now it is not
         * possible. This rarely happens, however sometimes the implementation
         * of parts of the Redis core may call incrRefCount() to protect
         * objects, and then call dbDelete(). In this case we'll fall
         * through and reach the dictFreeUnlinkedEntry() call, that will be
         * equivalent to just calling decrRefCount(). 
         * 如果释放这个对象需要做大量的工作,就把他放到异步线程里做
         * 但如果这个对象是共享对象(refcount > 1)就不能直接释放了,当然这很少发送,但有可能redis
         * 核心会调用incrRefCount来保护对象,然后调用dbDelete。这我只需要直接调用dictFreeUnlinkedEntry,
         * 等价于调用decrRefCount */
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
           // 异步释放+1,原子操作 atomicIncr(lazyfree_objects,1);
           // 将 value 的释放添加到异步线程队列中去,后台处理, 任务类型为 异步释放内存 bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
           设置val为NULL, 以便在外部进行删除时忽略释放value相关内存
            dictSetVal(db->dict,de,NULL);
        }
    }

    /* 释放键值对所占用的内存,如果是lazyFree,val已经是null了,只需要释放key的内存即可 */
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        if (server.cluster_enabled) slotToKeyDel(key->ptr);
        return 1;
    } else {
        return 0;
    }
}

上面的函数执行之后,就是添加异步任务到线程中,执行bioCreateLazyFreeJob函数,再执行bioSubmitJob,方法在redis/src/bio.c文件中

代码语言:javascript
复制
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
    va_list valist;
    /* Allocate memory for the job structure and all required
     * arguments */
    struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
    job->free_fn = free_fn;

    va_start(valist, arg_count);
    for (int i = 0; i < arg_count; i++) {
        job->free_args[i] = va_arg(valist, void *);
    }
    va_end(valist);
    bioSubmitJob(BIO_LAZY_FREE, job);
}

void bioSubmitJob(int type, struct bio_job *job) {
    job->time = time(NULL);
    // 多线程需要加锁,把待处理的job添加到队列末尾
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    // 唤醒任务线程
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

后台线程任务框架调用的方法是bioProcessBackgroundJobs,线程根据不同的类型的后台线程执行相关操作。方法位于redis/src/bio.c。

代码语言:javascript
复制
/* 根据参数创建不同的后台线程 */
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    switch (type) {
    case BIO_CLOSE_FILE:
        redis_set_thread_title("bio_close_file");
        break;
    case BIO_AOF_FSYNC:
        redis_set_thread_title("bio_aof_fsync");
        break;
    case BIO_LAZY_FREE:
        redis_set_thread_title("bio_lazy_free");
        break;
    }

    redisSetCpuAffinity(server.bio_cpulist);
    makeThreadKillable();
    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            // 注意此处将会释放锁哟,以便外部可以添加任务进来 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* 根据任务类型执行不同的逻辑 */
        if (type == BIO_CLOSE_FILE) {
            close(job->fd);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync(job->fd);
        } else if (type == BIO_LAZY_FREE) {
            job->free_fn(job->free_args);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

关于unlink的异步操作主要涉及到这几个函数的执行,这里unlink操作删除的时候会先评估释放数据的工作量,如果较小就会直接主线程做删除操作。

这里分享的源码希望对大家有帮助,如果有兴趣进一步了解,可以私我要一下整个redis翻译后的代码。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 利志分享 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档