前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Librdkafka对kafka topic的封装

Librdkafka对kafka topic的封装

作者头像
扫帚的影子
发布2018-09-05 16:45:32
1.3K0
发布2018-09-05 16:45:32
举报
文章被收录于专栏:分布式系统进阶
  • 上一节我们讲了librdkakfa对topic-partition的封装, 任何一个partition都必须要属于一下topic;
  • 我们这节就来分析一上librdkafka对topic的封装

rd_kafka_itopic_s
  • 所在文件: src/rdkafka_topic.h
  • 这里还有一个类型rd_kafka_topic_t,定义:typedef struct rd_kafka_topic_s rd_kafka_topic_t;,这是个空定义没有现实, 其实就是rd_kafka_itopic_s, 这个类型主要是面向librdkafka的使用者,sdk里称作app topic, 它有自己的引用计数. 在librdkafka内部使用rd_kafka_itopic, 它也有自己的引用计数, 有点罗嗦啊~
  • 定义:
代码语言:javascript
复制
struct rd_kafka_itopic_s {
        // 定义成tailq的元素
    TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;

        //  引用计数
    rd_refcnt_t        rkt_refcnt;

    rwlock_t           rkt_lock;

        // 所属topic的名字
    rd_kafkap_str_t   *rkt_topic;

        // 表示一个未assigned的partition
    shptr_rd_kafka_toppar_t  *rkt_ua;  /* unassigned partition */

        // 拥有的partition列表
    shptr_rd_kafka_toppar_t **rkt_p;
    int32_t            rkt_partition_cnt;

        // 期望操作的partition, 但还没有从broker获取到其信息的partition列表
        rd_list_t          rkt_desp;              /* Desired partitions
                                                   * that are not yet seen
                                                   * in the cluster. */
        // 最近一次更新metadata的时间
    rd_ts_t            rkt_ts_metadata; /* Timestamp of last metadata
                         * update for this topic. */

        mtx_t              rkt_app_lock;    /* Protects rkt_app_* */
, 
       // 在application层一个rd_kafka_itopic_s对外表现为一个 rd_kafka_topic_t类型的对象, 且有自己的引用计数
        rd_kafka_topic_t *rkt_app_rkt;      /* A shared topic pointer
                                             * to be used for callbacks
                                             * to the application. */
    int               rkt_app_refcnt;   /* Number of active rkt's new()ed
                         * by application. */
        //  topic的三种状态:未知, 已存在, 不存在
    enum {
        RD_KAFKA_TOPIC_S_UNKNOWN,   /* No cluster information yet */
        RD_KAFKA_TOPIC_S_EXISTS,    /* Topic exists in cluster */
        RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
    } rkt_state;

        int               rkt_flags;
#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL   0x1 /* Leader lost/unavailable
                                               * for at least one partition. */
        // 所属的rd_kafka_t
    rd_kafka_t       *rkt_rk;

        shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */

    rd_kafka_topic_conf_t rkt_conf;
};
  • 创建一个rd_kafka_itopic_s对象rd_kafka_topic_new0, 这是一个内部调用函数
代码语言:javascript
复制
shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
                                              const char *topic,
                                              rd_kafka_topic_conf_t *conf,
                                              int *existing,
                                              int do_lock) {
    rd_kafka_itopic_t *rkt;
        shptr_rd_kafka_itopic_t *s_rkt;
        const struct rd_kafka_metadata_cache_entry *rkmce;

       // topic名字check , 长度不能超512
    if (!topic || strlen(topic) > 512) {
        if (conf)
            rd_kafka_topic_conf_destroy(conf);
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
                    EINVAL);
        return NULL;
    }

    if (do_lock)
                rd_kafka_wrlock(rk);
        // 所有创建的rd_kafka_itopic对象都会加入到对应的topic的rk->rk_topics中, 先从中查找, 如果找到就不再创建
    if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) {
                if (do_lock)
                        rd_kafka_wrunlock(rk);
        if (conf)
            rd_kafka_topic_conf_destroy(conf);
                if (existing)
                        *existing = 1;
        return s_rkt;
        }

        if (existing)
                *existing = 0;

        // 分配对应的内存, 设置各属性
    rkt = rd_calloc(1, sizeof(*rkt));

    rkt->rkt_topic     = rd_kafkap_str_new(topic, -1);
    rkt->rkt_rk        = rk;

    if (!conf) {
                if (rk->rk_conf.topic_conf)
                        conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
                else
                        conf = rd_kafka_topic_conf_new();
        }
    rkt->rkt_conf = *conf;
    rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
                        * since we dont want to rd_free internal members,
                        * just the placeholder. The internal members
                        * were copied on the line above. */

    /* Default partitioner: consistent_random */
    if (!rkt->rkt_conf.partitioner)
        rkt->rkt_conf.partitioner = rd_kafka_msg_partitioner_consistent_random;

    if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
        rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;

        rd_list_init(&rkt->rkt_desp, 16, NULL);
        rd_refcnt_init(&rkt->rkt_refcnt, 0);

        s_rkt = rd_kafka_topic_keep(rkt);

    rwlock_init(&rkt->rkt_lock);
        mtx_init(&rkt->rkt_app_lock, mtx_plain);

    /* Create unassigned partition */
    rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);

        // 加入到对应的rk_kafka_t中的topic列表
    TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
    rk->rk_topic_cnt++;

        /* Populate from metadata cache. */
        // 加入或更新到metadata cache
        if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) {
                if (existing)
                        *existing = 1;

                rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
                                               rkmce->rkmce_ts_insert);
        }

        if (do_lock)
                rd_kafka_wrunlock(rk);

    return s_rkt;
  • 创建rd_kafka_topic_t对象, 对外的接口rd_kafka_topic_new
代码语言:javascript
复制
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
                                      rd_kafka_topic_conf_t *conf) {
        shptr_rd_kafka_itopic_t *s_rkt;
        rd_kafka_itopic_t *rkt;
        rd_kafka_topic_t *app_rkt;
        int existing;

       // 创建一个`shptr_rd_kafka_itopic_t`对象
        s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/);
        if (!s_rkt)
                return NULL;

        // 指针转换, 从`shptr_rd_kafka_itopic_t`到`rd_kafka_itopic_t`, 引用计数不变
        rkt = rd_kafka_topic_s2i(s_rkt);

        /* Save a shared pointer to be used in callbacks. */
        // 引数计数加1, 指针转换成一个`rd_kafka_topic_t`
        // app相对应的引用计数也加1
    app_rkt = rd_kafka_topic_keep_app(rkt);

        /* Query for the topic leader (async) */
        if (!existing)
               // 发metadata request, 获取leader等相关信息
                rd_kafka_topic_leader_query(rk, rkt);

        /* Drop our reference since there is already/now a rkt_app_rkt */
        rd_kafka_topic_destroy0(s_rkt);

        return app_rkt;
}
  • 获取当前rd_kafka_t对象持有的所有topic的名字,保存在一个rd_list
代码语言:javascript
复制
void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
        rd_kafka_itopic_t *rkt;

        rd_kafka_rdlock(rk);
        rd_list_grow(topics, rk->rk_topic_cnt);
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
                rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
        rd_kafka_rdunlock(rk);
}
  • 判断parition是否是有效的,就是判断其leader是否有效
代码语言:javascript
复制
  int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
                    int32_t partition) {
    int avail;
    shptr_rd_kafka_toppar_t *s_rktp;
        rd_kafka_toppar_t *rktp;
        rd_kafka_broker_t *rkb;

    s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
                                     partition, 0/*no ua-on-miss*/);
    if (unlikely(!s_rktp))
        return 0;

        rktp = rd_kafka_toppar_s2i(s_rktp);
        rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/);
        avail = rkb ? 1 : 0;
        if (rkb)
                rd_kafka_broker_destroy(rkb);
    rd_kafka_toppar_destroy(s_rktp);
    return avail;
}
  • 扫描所有topic的patitions:
    1. 筛出 kafka message过期的, 回调application层
    2. 找出需要刷新metadata的, 发送metadata request
代码语言:javascript
复制
int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
    rd_kafka_itopic_t *rkt;
    rd_kafka_toppar_t *rktp;
        shptr_rd_kafka_toppar_t *s_rktp;
    int totcnt = 0;
        rd_list_t query_topics;

        rd_list_init(&query_topics, 0, rd_free);

    rd_kafka_rdlock(rk);
    TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
        int p;
                int cnt = 0, tpcnt = 0;
                rd_kafka_msgq_t timedout;
                int query_this = 0;

                rd_kafka_msgq_init(&timedout);

        rd_kafka_topic_wrlock(rkt);

                /* Check if metadata information has timed out. */
               // metadata cache中没有缓存,需要query metadata
                if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
                    !rd_kafka_metadata_cache_topic_get(
                            rk, rkt->rkt_topic->str, 1/*only valid*/)) {
                        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);

                        query_this = 1;
                }

                /* Just need a read-lock from here on. */
                rd_kafka_topic_wrunlock(rkt);
                rd_kafka_topic_rdlock(rkt);

                if (rkt->rkt_partition_cnt == 0) {
                        query_this = 1;
                }

        for (p = RD_KAFKA_PARTITION_UA ;
             p < rkt->rkt_partition_cnt ; p++) {
            int did_tmout = 0;

            if (!(s_rktp = rd_kafka_toppar_get(rkt, p, 0)))
                continue;

                        rktp = rd_kafka_toppar_s2i(s_rktp);
            rd_kafka_toppar_lock(rktp);

                        /* Check that partition has a leader that is up,
                         * else add topic to query list. */
                       // partition leader无效时, 要request metadata
                        if (p != RD_KAFKA_PARTITION_UA &&
                            (!rktp->rktp_leader ||
                             rktp->rktp_leader->rkb_source ==
                             RD_KAFKA_INTERNAL ||
                             rd_kafka_broker_get_state(rktp->rktp_leader) <
                             RD_KAFKA_BROKER_STATE_UP)) {
                                query_this = 1;
                        }

            /* Scan toppar's message queues for timeouts */
            if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
                           &timedout, now) > 0)
                did_tmout = 1;

            if (rd_kafka_msgq_age_scan(&rktp->rktp_msgq,
                           &timedout, now) > 0)
                did_tmout = 1;

            tpcnt += did_tmout;

            rd_kafka_toppar_unlock(rktp);
            rd_kafka_toppar_destroy(s_rktp);
        }

                rd_kafka_topic_rdunlock(rkt);

                if ((cnt = rd_atomic32_get(&timedout.rkmq_msg_cnt)) > 0) {
                        totcnt += cnt;
                        // kafka mesage过期, 则需要回调到application层
                        rd_kafka_dr_msgq(rkt, &timedout,
                                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
                }

                /* Need to re-query this topic's leader. */
                if (query_this &&
                    !rd_list_find(&query_topics, rkt->rkt_topic->str,
                                  (void *)strcmp))
                        rd_list_add(&query_topics,
                                    rd_strdup(rkt->rkt_topic->str));

        }
        rd_kafka_rdunlock(rk);

        if (!rd_list_empty(&query_topics))
                // 发送 metadata request
                rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
                                                 1/*force even if cached
                                                    * info exists*/,
                                                 "refresh unavailable topics");
        rd_list_destroy(&query_topics);

        return totcnt;
}
  • 更新topic的partition个数, partition个数可能增加, 也可能减少rd_kafka_topic_partition_cnt_update, 简单讲:
    1. 新增的partition, 创建;
    2. 老的partition, 删除;
代码语言:javascript
复制
static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
                        int32_t partition_cnt) {
    rd_kafka_t *rk = rkt->rkt_rk;
    shptr_rd_kafka_toppar_t **rktps;
    shptr_rd_kafka_toppar_t *rktp_ua;
        shptr_rd_kafka_toppar_t *s_rktp;
    rd_kafka_toppar_t *rktp;
    rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
    int32_t i;

        更新前后partition数量相同的话, 不作任何处理
    if (likely(rkt->rkt_partition_cnt == partition_cnt))
        return 0; /* No change in partition count */

    /* Create and assign new partition list */
       // 创建新的partition list, 分配内存
    if (partition_cnt > 0)
        rktps = rd_calloc(partition_cnt, sizeof(*rktps));
    else
        rktps = NULL;

        // 如果新个数大于老个数
    for (i = 0 ; i < partition_cnt ; i++) {
                // 多出来的都是新扩容的partition
        if (i >= rkt->rkt_partition_cnt) {
            /* New partition. Check if its in the list of
             * desired partitions first. */
                        // 检查是否在desired partition 列表中
                        s_rktp = rd_kafka_toppar_desired_get(rkt, i);

                        rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
                        if (rktp) {
                                // 在desired partition 列表中,则移除它
                rd_kafka_toppar_lock(rktp);
                                rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;

                                /* Remove from desp list since the
                                 * partition is now known. */
                                rd_kafka_toppar_desired_unlink(rktp);
                                rd_kafka_toppar_unlock(rktp);
            } else
                s_rktp = rd_kafka_toppar_new(rkt, i);
                        // 赋值rktps[i]
            rktps[i] = s_rktp;
        } else {
                        // 如果是已经存在的partition, 放到rktps[i], 并且作引用计数的增减
            /* Existing partition, grab our own reference. */
            rktps[i] = rd_kafka_toppar_keep(
                rd_kafka_toppar_s2i(rkt->rkt_p[i]));
            /* Loose previous ref */
            rd_kafka_toppar_destroy(rkt->rkt_p[i]);
        }
    }

    rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);

        /* Propagate notexist errors for desired partitions */
        // 扫描desired partition 列表中, 还余下的都是无主的, 集群中不存在的partition, 回调error
        RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
                rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
                                          RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
        }

    /* Remove excessive partitions */
        // 处理更新后的partition个数小于更新前的情况, 需要删除一部分partition
    for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
        s_rktp = rkt->rkt_p[i];
                rktp = rd_kafka_toppar_s2i(s_rktp);
        rd_kafka_toppar_lock(rktp);

        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
                        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
                                     "Topic %s [%"PRId32"] is desired "
                                     "but no longer known: "
                                     "moving back on desired list",
                                     rkt->rkt_topic->str, rktp->rktp_partition);
                        // 是DESIRED状态的话, 再放回到desired列表
            rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
                        rd_kafka_toppar_desired_link(rktp);

                        if (!rd_kafka_terminating(rkt->rkt_rk))
                                rd_kafka_toppar_enq_error(
                                        rktp,
                                        RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
                        // 解除和broker的联系, 实际上是关联到内部的UA broker
            rd_kafka_toppar_broker_delegate(rktp, NULL, 0);

        } else {
            /* Tell handling broker to let go of the toppar */
            rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
            rd_kafka_toppar_broker_leave_for_remove(rktp);
        }

        rd_kafka_toppar_unlock(rktp);

        rd_kafka_toppar_destroy(s_rktp);
    }


    if (rkt->rkt_p)
        rd_free(rkt->rkt_p);

    rkt->rkt_p = rktps;
    rkt->rkt_partition_cnt = partition_cnt;

    return 1;
}
  • 将在UA partition上待发送的kafka message重新分配到有效的patition上rd_kafka_topic_assign_uas:
代码语言:javascript
复制
static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
                                       rd_kafka_resp_err_t err) {
    rd_kafka_t *rk = rkt->rkt_rk;
    shptr_rd_kafka_toppar_t *s_rktp_ua;
        rd_kafka_toppar_t *rktp_ua;
    rd_kafka_msg_t *rkm, *tmp;
    rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
    rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
    int cnt;

    if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
        return;

        // 没有UA partition,就直接返回了
    s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
    if (unlikely(!s_rktp_ua)) {
        return;
    }

        rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);

        // 将ua partition上的msg移动到临时队列上
    rd_kafka_toppar_lock(rktp_ua);
    rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
    cnt = rd_atomic32_get(&uas.rkmq_msg_cnt);
    rd_kafka_toppar_unlock(rktp_ua);

    TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
        /* Fast-path for failing messages with forced partition */
               // 无效的msg放到failed 队列
        if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
            rkm->rkm_partition >= rkt->rkt_partition_cnt &&
            rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
            rd_kafka_msgq_enq(&failed, rkm);
            continue;
        }
                
               // 重新路由kafka message到相应的partition, 失败则放入failed 队列
        if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
            /* Desired partition not available */
            rd_kafka_msgq_enq(&failed, rkm);
        }
    }
       
        // 失败的msg, 都回调给application层
    if (rd_atomic32_get(&failed.rkmq_msg_cnt) > 0) {
        /* Fail the messages */
        rd_kafka_dr_msgq(rkt, &failed,
                 rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
                 err :
                 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
    }

    rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */
}
  • 关于metadata相关的操作, 我们介绍metadata时再来分析

Librdkafka源码分析-Content Table

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.01.16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Librdkafka源码分析-Content Table
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档