前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Librdkafka对Kafka Metadata的封装和操作

Librdkafka对Kafka Metadata的封装和操作

作者头像
扫帚的影子
发布2018-09-05 16:44:44
2.1K0
发布2018-09-05 16:44:44
举报
代码语言:txt
复制
1. 所有broker的信息: ip和port;
2. 所有topic的信息: topic name, partition数量, 每个partition的leader, isr, replica集合等kafka集群的每一台broker都缓存了整个集群的metadata, 当broker或某一个topic的metadata信息发生变化时, 集群的

Metadata的获取
  • 我们先来看一下metadata的定义, 在rdkafka.h
  • kafka集群整体metadata的定义, 包括broker, topic, partition三部分:
代码语言:javascript
复制
typedef struct rd_kafka_metadata {
        int         broker_cnt;     /**< Number of brokers in \p brokers */
        struct rd_kafka_metadata_broker *brokers;  /**< Brokers */

        int         topic_cnt;      /**< Number of topics in \p topics */
        struct rd_kafka_metadata_topic *topics;    /**< Topics */

        int32_t     orig_broker_id;   /**< Broker originating this metadata */ 表示这个metadata是从哪个broker返回的
        char       *orig_broker_name; /**< Name of originating broker */
} rd_kafka_metadata_t;
  1. broker的metadata:
代码语言:javascript
复制
typedef struct rd_kafka_metadata_broker {
        int32_t     id;             /**< Broker Id */
        char       *host;           /**< Broker hostname */
        int         port;           /**< Broker listening port */
} rd_kafka_metadata_broker_t;
  1. topic的metadata:
代码语言:javascript
复制
typedef struct rd_kafka_metadata_topic {
        char       *topic;          /**< Topic name */
        int         partition_cnt;  /**< Number of partitions in \p partitions*/
        struct rd_kafka_metadata_partition *partitions; /**< Partitions */
        rd_kafka_resp_err_t err;    /**< Topic error reported by broker */
} rd_kafka_metadata_topic_t;
  1. partition的metadata:
代码语言:javascript
复制
typedef struct rd_kafka_metadata_partition {
        int32_t     id;             /**< Partition Id */
        rd_kafka_resp_err_t err;    /**< Partition error reported by broker */
        int32_t     leader;         /**< Leader broker */
        int         replica_cnt;    /**< Number of brokers in \p replicas */
        int32_t    *replicas;       /**< Replica brokers */
        int         isr_cnt;        /**< Number of ISR brokers in \p isrs */
        int32_t    *isrs;           /**< In-Sync-Replica brokers */
} rd_kafka_metadata_partition_t;
  • metadata的获取, 阻塞操作 rd_kafka_metadata:
代码语言:javascript
复制
rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
                   rd_kafka_topic_t *only_rkt,
                   const struct rd_kafka_metadata **metadatap,
                   int timeout_ms) {
        rd_kafka_q_t *rkq;
        rd_kafka_broker_t *rkb;
        rd_kafka_op_t *rko;
    rd_ts_t ts_end = rd_timeout_init(timeout_ms);
        rd_list_t topics;

        // 在timeout_ms时长的超时时间内选择一台有效的broker,
    rkb = rd_kafka_broker_any_usable(rk, timeout_ms, 1);
    if (!rkb)
        return RD_KAFKA_RESP_ERR__TRANSPORT;

       // 创建一个queue, 作为metadata的response返回完生成的op的replay队列
        rkq = rd_kafka_q_new(rk);

        rd_list_init(&topics, 0, rd_free);
        if (!all_topics) {
                if (only_rkt)
                        rd_list_add(&topics,
                                    rd_strdup(rd_kafka_topic_a2i(only_rkt)->
                                              rkt_topic->str));
                else
                        rd_kafka_local_topics_to_list(rkb->rkb_rk, &topics);
        }

        // 创建op, 里面会关联metadata request的buffer
        rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA);
        rd_kafka_op_set_replyq(rko, rkq, 0);
        rko->rko_u.metadata.force = 1; /* Force metadata request regardless
                                        * of outstanding metadata requests. */
        // 构造metadata request的二进制协议并最终放到broker的发送buffer队列里
        rd_kafka_MetadataRequest(rkb, &topics, "application requested", rko);

        rd_list_destroy(&topics);
        rd_kafka_broker_destroy(rkb);

       // 等待metadata response返回或超时, 如果正常返回,在放入这个replay queue之前, 返回的response二进制协议会被parse到` rd_kafka_metadata`对象中
        rko = rd_kafka_q_pop(rkq, rd_timeout_remains(ts_end), 0);

        rd_kafka_q_destroy_owner(rkq);

        /* Timeout */
        if (!rko)
                return RD_KAFKA_RESP_ERR__TIMED_OUT;

        /* Error */
        if (rko->rko_err) {
                rd_kafka_resp_err_t err = rko->rko_err;
                rd_kafka_op_destroy(rko);
                return err;
        }

        /* Reply: pass metadata pointer to application who now owns it*/
        rd_kafka_assert(rk, rko->rko_u.metadata.md);
        *metadatap = rko->rko_u.metadata.md;
        rko->rko_u.metadata.md = NULL;
        rd_kafka_op_destroy(rko);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}
  • metadata的拷贝操作 rd_kafka_metadata_copy (const struct rd_kafka_metadata *src, size_t size): 执行一个深度copy, 使用rd_tmpabuf_t,保证内存的对齐;
  • 处理metadata的response rd_kafka_parse_Metadata:
代码语言:javascript
复制
rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
                         rd_kafka_buf_t *request,
                         rd_kafka_buf_t *rkbuf) {
        rd_kafka_t *rk = rkb->rkb_rk;
        int i, j, k;
        rd_tmpabuf_t tbuf;
        struct rd_kafka_metadata *md;
        size_t rkb_namelen;
        const int log_decode_errors = LOG_ERR;
        rd_list_t *missing_topics = NULL;
        const rd_list_t *requested_topics = request->rkbuf_u.Metadata.topics;
        int all_topics = request->rkbuf_u.Metadata.all_topics;
        const char *reason = request->rkbuf_u.Metadata.reason ?
                request->rkbuf_u.Metadata.reason : "(no reason)";
        int ApiVersion = request->rkbuf_reqhdr.ApiVersion;
        rd_kafkap_str_t cluster_id = RD_ZERO_INIT;
        int32_t controller_id = -1;

        rd_kafka_assert(NULL, thrd_is_current(rk->rk_thread));

        /* Remove topics from missing_topics as they are seen in Metadata. */
        if (requested_topics)
                missing_topics = rd_list_copy(requested_topics,
                                              rd_list_string_copy, NULL);

        rd_kafka_broker_lock(rkb);
        rkb_namelen = strlen(rkb->rkb_name)+1;
        
        // 使用tmpbuf从parse的内存分配
        rd_tmpabuf_new(&tbuf,
                       sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4),
                       0/*dont assert on fail*/);

        if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md))))
                goto err;
        md->orig_broker_id = rkb->rkb_nodeid;
        md->orig_broker_name = rd_tmpabuf_write(&tbuf,
                                                rkb->rkb_name, rkb_namelen);
        rd_kafka_broker_unlock(rkb);

        // 解析并添充broker信息
        .
        .
        .

       // 解析并添充topic和partition信息
       .
       .
       .

        // 如果正在shutdown, 我们就不更新medata信息了
        if (rd_kafka_terminating(rkb->rkb_rk))
                goto done;

        // 如果解析出来的broker个数是0, 或者topic个数是0, 就准备retry吧
        if (md->broker_cnt == 0 && md->topic_cnt == 0) {
                rd_rkb_dbg(rkb, METADATA, "METADATA",
                           "No brokers or topics in metadata: retrying");
                goto err;
        }

        // 更新broker 列表, 具体的更新操作, 我们在分析broker时再具体介绍
       // 已存在的更新,没有的新添加, 最终会开启一个新的broker的io event loop
        for (i = 0 ; i < md->broker_cnt ; i++) {
                rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto,
                                       &md->brokers[i]);
        }

        /* Update partition count and leader for each topic we know about */
        for (i = 0 ; i < md->topic_cnt ; i++) {
                rd_kafka_metadata_topic_t *mdt = &md->topics[i];

                /* Ignore topics in blacklist */
                if (rkb->rkb_rk->rk_conf.topic_blacklist &&
                    rd_kafka_pattern_match(rkb->rkb_rk->rk_conf.topic_blacklist,
                                           mdt->topic)) {
                        rd_rkb_dbg(rkb, TOPIC, "BLACKLIST",
                                   "Ignoring blacklisted topic \"%s\" "
                                   "in metadata", mdt->topic);
                        continue;
                }

                /* Ignore metadata completely for temporary errors. (issue #513)
                 *   LEADER_NOT_AVAILABLE: Broker is rebalancing
                 */
                if (mdt->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE &&
                    mdt->partition_cnt == 0) {
                        rd_list_free_cb(missing_topics,
                                        rd_list_remove_cmp(missing_topics,
                                                           mdt->topic,
                                                           (void *)strcmp));
                        continue;
                }


                // 更新topic detadata, 会涉及到leader的涉及, 处理partition的新增和减少,  都是通过op作的异步操作
                rd_kafka_topic_metadata_update2(rkb, mdt);

                if (requested_topics) {
                        rd_list_free_cb(missing_topics,
                                        rd_list_remove_cmp(missing_topics,
                                                           mdt->topic,
                                                           (void*)strcmp));
                        if (!all_topics) {
                                rd_kafka_wrlock(rk);
                                rd_kafka_metadata_cache_topic_update(rk, mdt);
                                rd_kafka_wrunlock(rk);
                        }
                }
        }


       // 对没有获取到metadata的topic, 作清理操作
        if (missing_topics) {
                char *topic;
                RD_LIST_FOREACH(topic, missing_topics, i) {
                        shptr_rd_kafka_itopic_t *s_rkt;

                        s_rkt = rd_kafka_topic_find(rkb->rkb_rk, topic, 1/*lock*/);
                        if (s_rkt) {
                                rd_kafka_topic_metadata_none(
                                        rd_kafka_topic_s2i(s_rkt));
                                rd_kafka_topic_destroy0(s_rkt);
                        }
                }
        }


        rd_kafka_wrlock(rkb->rkb_rk);
        rkb->rkb_rk->rk_ts_metadata = rd_clock();

        /* Update cached cluster id. */
        if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 &&
            (!rkb->rkb_rk->rk_clusterid ||
             rd_kafkap_str_cmp_str(&cluster_id, rkb->rkb_rk->rk_clusterid))) {
                if (rkb->rkb_rk->rk_clusterid)
                        rd_free(rkb->rkb_rk->rk_clusterid);
                rkb->rkb_rk->rk_clusterid = RD_KAFKAP_STR_DUP(&cluster_id);
        }

        if (all_topics) {
               // 更新 metadata cache
                rd_kafka_metadata_cache_update(rkb->rkb_rk,
                                               md, 1/*abs update*/);

                if (rkb->rkb_rk->rk_full_metadata)
                        rd_kafka_metadata_destroy(rkb->rkb_rk->rk_full_metadata);
                rkb->rkb_rk->rk_full_metadata =
                        rd_kafka_metadata_copy(md, tbuf.of);
                rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata;
                rd_rkb_dbg(rkb, METADATA, "METADATA",
                           "Caching full metadata with "
                           "%d broker(s) and %d topic(s): %s",
                           md->broker_cnt, md->topic_cnt, reason);
        } else {
                rd_kafka_metadata_cache_expiry_start(rk);
        }

        /* Remove cache hints for the originally requested topics. */
        if (requested_topics)
                rd_kafka_metadata_cache_purge_hints(rk, requested_topics);

        rd_kafka_wrunlock(rkb->rkb_rk);

        /* Check if cgrp effective subscription is affected by
         * new metadata. */
        if (rkb->rkb_rk->rk_cgrp)
                rd_kafka_cgrp_metadata_update_check(
                        rkb->rkb_rk->rk_cgrp, 1/*do join*/);
done:
        if (missing_topics)
                rd_list_destroy(missing_topics);
        return md;

 err_parse:
err:
        if (requested_topics) {
                /* Failed requests shall purge cache hints for
                 * the requested topics. */
                rd_kafka_wrlock(rkb->rkb_rk);
                rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
                rd_kafka_wrunlock(rkb->rkb_rk);
        }

        if (missing_topics)
                rd_list_destroy(missing_topics);

        rd_tmpabuf_destroy(&tbuf);
        return NULL;
}
  • 刷新给定的topic list的metadata rd_kafka_metadata_refresh_topics:
代码语言:javascript
复制
rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
                                  const rd_list_t *topics, int force,
                                  const char *reason) {
        rd_list_t q_topics;
        int destroy_rkb = 0;

        if (!rk)
                rk = rkb->rkb_rk;

        rd_kafka_wrlock(rk);

        // 确定一下发送metadata request的broker
        if (!rkb) {
                if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 0))){
                        rd_kafka_wrunlock(rk);
                        rd_kafka_dbg(rk, METADATA, "METADATA",
                                     "Skipping metadata refresh of %d topic(s):"
                                     " no usable brokers",
                                     rd_list_cnt(topics));
                        return RD_KAFKA_RESP_ERR__TRANSPORT;
                }
                destroy_rkb = 1;
        }

        rd_list_init(&q_topics, rd_list_cnt(topics), rd_free);

        if (!force) {
                // 如查不是强制必须立刻刷新的话, 只是把这个topic对应在metadata cache中的状态改为RD_KAFKA_RESP_ERR__WAIT_CACHE, 设置新的刷新时间
                rd_kafka_metadata_cache_hint(rk, topics, &q_topics,
                                             0/*dont replace*/);
                rd_kafka_wrunlock(rk);

                if (rd_list_cnt(&q_topics) == 0) {
                        rd_list_destroy(&q_topics);
                        if (destroy_rkb)
                                rd_kafka_broker_destroy(rkb);
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                }

        } else {
                // 立即发送metadata request
                rd_kafka_wrunlock(rk);
                rd_list_copy_to(&q_topics, topics, rd_list_string_copy, NULL);
        }

        rd_kafka_MetadataRequest(rkb, &q_topics, reason, NULL);

        rd_list_destroy(&q_topics);

        if (destroy_rkb)
                rd_kafka_broker_destroy(rkb);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}
  • 只请求broker相关的metadata
代码语言:javascript
复制
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
                                   const char *reason) {
        return rd_kafka_metadata_request(rk, rkb, NULL /*brokers only*/,
                                         reason, NULL);
}
  • 一个快速地刷新partition leader的操作 rd_kafka_metadata_fast_leader_query 这个rkmc_query_tmr是专门真对没有leader的topic的定期刷新
代码语言:javascript
复制
void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk) {
        rd_ts_t next;

        /* Restart the timer if it will speed things up. */
        next = rd_kafka_timer_next(&rk->rk_timers,
                                   &rk->rk_metadata_cache.rkmc_query_tmr,
                                   1/*lock*/);
        if (next == -1 /* not started */ ||
            next > rk->rk_conf.metadata_refresh_fast_interval_ms*1000) {
                //开始一个fast ledader query定时期, 过期执行 rd_kafka_metadata_leader_query_tmr_cb
                rd_kafka_timer_start(&rk->rk_timers,
                                     &rk->rk_metadata_cache.rkmc_query_tmr,
                                     rk->rk_conf.
                                     metadata_refresh_fast_interval_ms*1000,
                                     rd_kafka_metadata_leader_query_tmr_cb,
                                     NULL);
        }
}
  • leader定时刷新处理回调函数 rd_kafka_metadata_leader_query_tmr_cb:
代码语言:javascript
复制
static void rd_kafka_metadata_leader_query_tmr_cb (rd_kafka_timers_t *rkts,
                                                   void *arg) {
        rd_kafka_t *rk = rkts->rkts_rk;
        rd_kafka_timer_t *rtmr = &rk->rk_metadata_cache.rkmc_query_tmr;
        rd_kafka_itopic_t *rkt;
        rd_list_t topics;

        rd_kafka_wrlock(rk);
        rd_list_init(&topics, rk->rk_topic_cnt, rd_free);

        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
                int i, no_leader = 0;
                rd_kafka_topic_rdlock(rkt);

                //  跳过不存在的topic的处理
                if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) {
                        /* Skip topics that are known to not exist. */
                        rd_kafka_topic_rdunlock(rkt);
                        continue;
                }

                no_leader = rkt->rkt_flags & RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;

               // 如果某个topic中有一个partition没有leader就是个没有leader的topic
                for (i = 0 ; !no_leader && i < rkt->rkt_partition_cnt ; i++) {
                        rd_kafka_toppar_t *rktp =
                                rd_kafka_toppar_s2i(rkt->rkt_p[i]);
                        rd_kafka_toppar_lock(rktp);
                        no_leader = !rktp->rktp_leader &&
                                !rktp->rktp_next_leader;
                        rd_kafka_toppar_unlock(rktp);
                }

                if (no_leader || rkt->rkt_partition_cnt == 0)
                        rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str));

                rd_kafka_topic_rdunlock(rkt);
        }

        rd_kafka_wrunlock(rk);

        if (rd_list_cnt(&topics) == 0) {
                /* No leader-less topics+partitions, stop the timer. */
                rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/);
        } else {
                // 针对选择好的topic发送metadata request
                rd_kafka_metadata_refresh_topics(rk, NULL, &topics, 1/*force*/,
                                                 "partition leader query");
                /* Back off next query exponentially until we reach
                 * the standard query interval - then stop the timer
                 * since the intervalled querier will do the job for us. */
                if (rk->rk_conf.metadata_refresh_interval_ms > 0 &&
                    rtmr->rtmr_interval * 2 / 1000 >=
                    rk->rk_conf.metadata_refresh_interval_ms)
                        rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/);
                else
                        rd_kafka_timer_backoff(rkts, rtmr,
                                               (int)rtmr->rtmr_interval);
        }

        rd_list_destroy(&topics);
}
Metadata在内存中的cache及其操作
  • cache相关结构体
  • cache entry:
代码语言:javascript
复制
struct rd_kafka_metadata_cache_entry {
        rd_avl_node_t rkmce_avlnode;             /* rkmc_avl */
        TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
        rd_ts_t rkmce_ts_expires;                /* Expire time */ 这个entry对应的metadata下次刷新的时间
        rd_ts_t rkmce_ts_insert;                 /* Insert time */
        rd_kafka_metadata_topic_t rkmce_mtopic;  /* Cached topic metadata */  对应的metadata信息
        /* rkmce_partitions memory points here. */
};
  1. cache结构体定义:
代码语言:javascript
复制
struct rd_kafka_metadata_cache {
        rd_avl_t         rkmc_avl; // 使用红黑树来存储每个entry, key为topic name, 加速查找
        TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry; // 使用tailq来存储所有被cached的entry, 过期时间早的会被排在 tailq的前面
        rd_kafka_timer_t rkmc_expiry_tmr;
        int              rkmc_cnt;

        /* Protected by full_lock: */
       // 针对所有topic或broker的metadata的请求在repsonse没有返回之前最多只能发送一个
        mtx_t            rkmc_full_lock;
        int              rkmc_full_topics_sent; /* Full MetadataRequest for
                                                 * all topics has been sent,
                                                 * awaiting response. */
        int              rkmc_full_brokers_sent; /* Full MetadataRequest for
                                                  * all brokers (but not topics)
                                                  * has been sent,
                                                  * awaiting response. */
        // 用于没有leader的topic的定时metadata请求
        rd_kafka_timer_t rkmc_query_tmr; /* Query timer for topic's without
                                          * leaders. */
        cnd_t            rkmc_cnd;       /* cache_wait_change() cond. */
        mtx_t            rkmc_cnd_lock;  /* lock for rkmc_cnd */
};
  • cache的初始化 rd_kafka_metadata_cache_init:
代码语言:javascript
复制
void rd_kafka_metadata_cache_init (rd_kafka_t *rk) {
        // 初始化红黑树
        rd_avl_init(&rk->rk_metadata_cache.rkmc_avl,
                    rd_kafka_metadata_cache_entry_cmp, 0);
       // 初初化tailq 
        TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry);
        mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain);
        mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain);
        cnd_init(&rk->rk_metadata_cache.rkmc_cnd);

}
  • cache删除操作 rd_kafka_metadata_cache_delete
代码语言:javascript
复制
rd_kafka_metadata_cache_delete (rd_kafka_t *rk,
                                struct rd_kafka_metadata_cache_entry *rkmce,
                                int unlink_avl) {
       // 需要从红黑树上摘掉的话就摘掉
        if (unlink_avl)
                RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce);
        TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link);
        rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0);
        rk->rk_metadata_cache.rkmc_cnt--;

        rd_free(rkmce);
}
  • cache的查找操作 rd_kafka_metadata_cache_find: 从红黑树上查找metadata
代码语言:javascript
复制
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid) {
        struct rd_kafka_metadata_cache_entry skel, *rkmce;
        skel.rkmce_mtopic.topic = (char *)topic;
        rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl, &skel);
        if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce)))
                return rkmce;
        return NULL;
}
  • cache的插入操作 rd_kafka_metadata_cache_insert
代码语言:javascript
复制
static struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_insert (rd_kafka_t *rk,
                                const rd_kafka_metadata_topic_t *mtopic,
                                rd_ts_t now, rd_ts_t ts_expires) {
        struct rd_kafka_metadata_cache_entry *rkmce, *old;
        size_t topic_len;
        rd_tmpabuf_t tbuf;
        int i;
        
        // 叙事我用tmpbuffer来得新构造一个metadata entry
        topic_len = strlen(mtopic->topic) + 1;
        rd_tmpabuf_new(&tbuf,
                       RD_ROUNDUP(sizeof(*rkmce), 8) +
                       RD_ROUNDUP(topic_len, 8) +
                       (mtopic->partition_cnt *
                        RD_ROUNDUP(sizeof(*mtopic->partitions), 8)),
                       1/*assert on fail*/);

        rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce));

        rkmce->rkmce_mtopic = *mtopic;

        /* Copy topic name and update pointer */
        rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic);

        /* Copy partition array and update pointer */
        rkmce->rkmce_mtopic.partitions =
                rd_tmpabuf_write(&tbuf, mtopic->partitions,
                                 mtopic->partition_cnt *
                                 sizeof(*mtopic->partitions));

        /* Clear uncached fields. */
        for (i = 0 ; i < mtopic->partition_cnt ; i++) {
                rkmce->rkmce_mtopic.partitions[i].replicas = NULL;
                rkmce->rkmce_mtopic.partitions[i].replica_cnt = 0;
                rkmce->rkmce_mtopic.partitions[i].isrs = NULL;
                rkmce->rkmce_mtopic.partitions[i].isr_cnt = 0;
        }

        /* Sort partitions for future bsearch() lookups. */
        qsort(rkmce->rkmce_mtopic.partitions,
              rkmce->rkmce_mtopic.partition_cnt,
              sizeof(*rkmce->rkmce_mtopic.partitions),
              rd_kafka_metadata_partition_id_cmp);

        // 插到缓存tailq的队尾
        TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry,
                          rkmce, rkmce_link);
        rk->rk_metadata_cache.rkmc_cnt++;
        rkmce->rkmce_ts_expires = ts_expires;
        rkmce->rkmce_ts_insert = now;

        /* Insert (and replace existing) entry. */
        // 插入到红黑树中, 如果已存在,就替换到原有的
        old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce,
                            rkmce_avlnode);
        if (old)
                rd_kafka_metadata_cache_delete(rk, old, 0);

        /* Explicitly not freeing the tmpabuf since rkmce points to its
         * memory. */
        return rkmce;
}
  • 清空所有的cache
代码语言:javascript
复制
static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) {
        struct rd_kafka_metadata_cache_entry *rkmce;
        int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry);
       
        // 清除每一个entry
        while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
                rd_kafka_metadata_cache_delete(rk, rkmce, 1);
        // 停掉过期刷新的timter
        rd_kafka_timer_stop(&rk->rk_timers,
                            &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);

        // brordcast 状态 
        if (!was_empty)
                rd_kafka_metadata_cache_propagate_changes(rk);
}
  • cache的更新 rd_kafka_metadata_cache_topic_update
代码语言:javascript
复制
rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk,
                                      const rd_kafka_metadata_topic_t *mdt) {
        rd_ts_t now = rd_clock();
        rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
        int changed = 1;

        if (!mdt->err)
                // 获取的metadata没有错误,就insert
                rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires);
        else
                // 获取的metadata有错误, 有删除
                changed = rd_kafka_metadata_cache_delete_by_name(rk,
                                                                 mdt->topic);
        // 插入了新的,或者成功删除了已有的, 都表明cache有改变, 需要broadcast状态
        if (changed)
                rd_kafka_metadata_cache_propagate_changes(rk);
}
  • 开始或更新过期刷新metadata的timer 如果调用时这个timer已经开始就更新下它的expire时间
代码语言:javascript
复制
void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk) {
        struct rd_kafka_metadata_cache_entry *rkmce;

        if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
                rd_kafka_timer_start(&rk->rk_timers,
                                     &rk->rk_metadata_cache.rkmc_expiry_tmr,
                                     rkmce->rkmce_ts_expires - rd_clock(),
                                     rd_kafka_metadata_cache_evict_tmr_cb,
                                     rk);
}
  • 过期刷新时的回调函数
代码语言:javascript
复制
static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk) {
        int cnt = 0;
        rd_ts_t now = rd_clock();
        struct rd_kafka_metadata_cache_entry *rkmce;

        //过期时间早的会被排在 tailq的前面, 删除所有过期的cache
        while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)) &&
               rkmce->rkmce_ts_expires <= now) {
                rd_kafka_metadata_cache_delete(rk, rkmce, 1);
                cnt++;
        }

        if (rkmce)
                // 删除所有过期的cacher后,若队列不为空, 则开始新的timer
                rd_kafka_timer_start(&rk->rk_timers,
                                     &rk->rk_metadata_cache.rkmc_expiry_tmr,
                                     rkmce->rkmce_ts_expires - now,
                                     rd_kafka_metadata_cache_evict_tmr_cb,
                                     rk);
        else
                // // 删除所有过期的cacher后,若队列为空, 则stop timer
                rd_kafka_timer_stop(&rk->rk_timers,
                                    &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);

        if (cnt)
                rd_kafka_metadata_cache_propagate_changes(rk);

        return cnt;
}
  • 根据topic名字获取对应的topic metadata
代码语言:javascript
复制
const rd_kafka_metadata_topic_t *
rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic,
                                   int valid) {
        struct rd_kafka_metadata_cache_entry *rkmce;

        if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, valid)))
                return NULL;

        return &rkmce->rkmce_mtopic;
}
  • 根据topic-parition获取对应的partition metadata
代码语言:javascript
复制
int rd_kafka_metadata_cache_topic_partition_get (
        rd_kafka_t *rk,
        const rd_kafka_metadata_topic_t **mtopicp,
        const rd_kafka_metadata_partition_t **mpartp,
        const char *topic, int32_t partition, int valid) {

        const rd_kafka_metadata_topic_t *mtopic;
        const rd_kafka_metadata_partition_t *mpart;
        rd_kafka_metadata_partition_t skel = { .id = partition };

        *mtopicp = NULL;
        *mpartp = NULL;

        if (!(mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, valid)))
                return -1;

        *mtopicp = mtopic;

        /* Partitions array may be sparse so use bsearch lookup. */
       // 二分法查找
        mpart = bsearch(&skel, mtopic->partitions,
                        mtopic->partition_cnt,
                        sizeof(*mtopic->partitions),
                        rd_kafka_metadata_partition_id_cmp);

        if (!mpart)
                return 0;

        *mpartp = mpart;

        return 1;
}
  • 广播cache内容的变化
代码语言:javascript
复制
static void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk) {
        mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
        cnd_broadcast(&rk->rk_metadata_cache.rkmc_cnd);
        mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);
}
* 等待cache内容的变化

int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms) {

int r;

代码语言:javascript
复制
    mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
    r = cnd_timedwait_ms(&rk->rk_metadata_cache.rkmc_cnd,
                         &rk->rk_metadata_cache.rkmc_cnd_lock,
                         timeout_ms);
    mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);

    return r == thrd_success;

}

代码语言:javascript
复制
* 清除给定的topic列表里对应的cache,在红黑树中找不到的,或者找到了但状态是RD_KAFKA_RESP_ERR__WAIT_CACHE的都不需要删除

void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk,

const rd_list_t *topics) {

const char *topic;

int i;

int cnt = 0;

代码语言:javascript
复制
    RD_LIST_FOREACH(topic, topics, i) {
            struct rd_kafka_metadata_cache_entry *rkmce;
            // 在红黑树中找不到的,或者找到了但状态是RD_KAFKA_RESP_ERR__WAIT_CACHE的都不需要删除
            if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic,
                                                       0/*any*/)) ||
                RD_KAFKA_METADATA_CACHE_VALID(rkmce))
                    continue;

            rd_kafka_metadata_cache_delete(rk, rkmce, 1/*unlink avl*/);
            cnt++;
    }

    if (cnt > 0) {
            rd_kafka_metadata_cache_propagate_changes(rk);
    }

}

代码语言:javascript
复制
---
### [Librdkafka源码分析-Content Table](https://www.jianshu.com/p/1a94bb09a6e6)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.01.22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Metadata的获取
  • Metadata在内存中的cache及其操作
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档