前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Librdkafka对Kafka Message的封装和相关操作

Librdkafka对Kafka Message的封装和相关操作

作者头像
扫帚的影子
发布2018-09-05 16:50:46
2K0
发布2018-09-05 16:50:46
举报
  • struct rd_kafka_message_t
  • struct rd_kafka_msg_t
  • struct rd_kafka_msgq_t
  • kafka message的协议格式可参考 官网

struct rd_kafka_message_s
  • 所在文件: src/rdkafka.h
  • 生产的数据在application层调用接口后最终会将数据封装成这个结构, 从broker消费下来的数据回调给application层时也会封装成这个结构;
  • 定义:
代码语言:javascript
复制
typedef struct rd_kafka_message_s {
    rd_kafka_resp_err_t err;   /**< Non-zero for error signaling. */
    rd_kafka_topic_t *rkt;     /**< Topic */
    int32_t partition;         /**< Partition */
    void   *payload;           /**< Producer: original message payload.
                    * Consumer: Depends on the value of \c err :
                    * - \c err==0: Message payload.
                    * - \c err!=0: Error string */
    size_t  len;               /**< Depends on the value of \c err :
                    * - \c err==0: Message payload length
                    * - \c err!=0: Error string length */
    void   *key;               /**< Depends on the value of \c err :
                    * - \c err==0: Optional message key */
    size_t  key_len;           /**< Depends on the value of \c err :
                    * - \c err==0: Optional message key length*/
    int64_t offset;            /**< Consume:
                                    * - Message offset (or offset for error
                    *   if \c err!=0 if applicable).
                                    * - dr_msg_cb:
                                    *   Message offset assigned by broker.
                                    *   If \c produce.offset.report is set then
                                    *   each message will have this field set,
                                    *   otherwise only the last message in
                                    *   each produced internal batch will
                                    *   have this field set, otherwise 0. */
    void  *_private;           /**< Consume:
                    *  - rdkafka private pointer: DO NOT MODIFY
                    *  - dr_msg_cb:
                                    *    msg_opaque from produce() call */
} rd_kafka_message_t;
struct rd_kafka_msg_t
  • 所在文件: src/rdkafka_msg
  • 封装了上面的 struct rd_kafka_message_s
  • 定义:
代码语言:javascript
复制
typedef struct rd_kafka_msg_s {
    rd_kafka_message_t rkm_rkmessage;  /* MUST be first field */

        // 使其成为tailq的元素
    TAILQ_ENTRY(rd_kafka_msg_s)  rkm_link;

    int        rkm_flags;

        // 时间戳, 分两类: 客户端生间时的时间和broker接收后作append log时的时间
    int64_t    rkm_timestamp;  
    rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */

        union {
                struct {
                        rd_ts_t ts_timeout; /* Message timeout */
                        rd_ts_t ts_enq;     /* Enqueue/Produce time */
                } producer;
        } rkm_u;
} rd_kafka_msg_t;
  • rd_kafka_message_t类型转化为rd_kafka_msg_t: rd_kafka_message_t rkm_rkmessage必须是struct rd_kafka_msg_s结构的第一个字段,
代码语言:javascript
复制
rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
    return (rd_kafka_msg_t *)rkmessage;
}
  • 发送kafka message前的审计 librdkafka支持异步发送, 本地有发送缓冲区, 因为在发送前需要作check, 看发送队列是否已满, 如果设置了block发送, 在发送队列满的情况在要一直阻塞wait, 直到被signal
代码语言:javascript
复制
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
            int block) {

    if (rk->rk_type != RD_KAFKA_PRODUCER)
        return RD_KAFKA_RESP_ERR_NO_ERROR;

    mtx_lock(&rk->rk_curr_msgs.lock);
    while (unlikely(rk->rk_curr_msgs.cnt + cnt >
            rk->rk_curr_msgs.max_cnt ||
            (unsigned long long)(rk->rk_curr_msgs.size + size) >
            (unsigned long long)rk->rk_curr_msgs.max_size)) {
        if (!block) {
            mtx_unlock(&rk->rk_curr_msgs.lock);
            return RD_KAFKA_RESP_ERR__QUEUE_FULL;
        }

        cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
    }

    rk->rk_curr_msgs.cnt  += cnt;
    rk->rk_curr_msgs.size += size;
    mtx_unlock(&rk->rk_curr_msgs.lock);

    return RD_KAFKA_RESP_ERR_NO_ERROR;
}
  • 创建rd_kafka_msg_t, 内部接口 rd_kafka_msg_t *rd_kafka_msg_new00
代码语言:javascript
复制
rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt,
                    int32_t partition,
                    int msgflags,
                    char *payload, size_t len,
                    const void *key, size_t keylen,
                    void *msg_opaque) {
    rd_kafka_msg_t *rkm;
    size_t mlen = sizeof(*rkm);
    char *p;

    /* If we are to make a copy of the payload, allocate space for it too */
        // 如果设置了RD_KAFKA_MSG_F_COPY,  需要为payload分配内存,在rd_kafka_msg_t后面
    if (msgflags & RD_KAFKA_MSG_F_COPY) {
        msgflags &= ~RD_KAFKA_MSG_F_FREE;
        mlen += len;
    }

    mlen += keylen;

    /* Note: using rd_malloc here, not rd_calloc, so make sure all fields
     *       are properly set up. */
    rkm                 = rd_malloc(mlen);
    rkm->rkm_err        = 0;
    rkm->rkm_flags      = RD_KAFKA_MSG_F_FREE_RKM | msgflags;
    rkm->rkm_len        = len;
    rkm->rkm_opaque     = msg_opaque;
    rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt);

    rkm->rkm_partition  = partition;
        rkm->rkm_offset     = RD_KAFKA_OFFSET_INVALID;
    rkm->rkm_timestamp  = 0;
    rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;

    p = (char *)(rkm+1);

        // 复制payload
    if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
        /* Copy payload to space following the ..msg_t */
        rkm->rkm_payload = p;
        memcpy(rkm->rkm_payload, payload, len);
        p += len;

    } else {
        /* Just point to the provided payload. */
        rkm->rkm_payload = payload;
    }

    if (key) {
        rkm->rkm_key     = p;
        rkm->rkm_key_len = keylen;
        memcpy(rkm->rkm_key, key, keylen);
    } else {
        rkm->rkm_key = NULL;
        rkm->rkm_key_len = 0;
    }

        return rkm;
}
  • 创建rd_kafka_msg_t, 创建之前增加check, 内部接口 rd_kafka_msg_t *rd_kafka_msg_new0
代码语言:javascript
复制
static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
                                          int32_t force_partition,
                                          int msgflags,
                                          char *payload, size_t len,
                                          const void *key, size_t keylen,
                                          void *msg_opaque,
                                          rd_kafka_resp_err_t *errp,
                                          int *errnop,
                                          int64_t timestamp,
                                          rd_ts_t now) {
    rd_kafka_msg_t *rkm;

    if (unlikely(!payload))
        len = 0;
    if (!key)
        keylen = 0;

        // 检查msg大小是否超出了配置的最大msg大小
    if (unlikely(len + keylen >
             (size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
             keylen > INT32_MAX)) {
        *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
        if (errnop)
            *errnop = EMSGSIZE;
        return NULL;
    }

        // 检查发送队列是否已满, 如果设置了block发送, 在发送队列满的情况在要一直阻塞wait, 直到被signal
    *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len,
                       msgflags & RD_KAFKA_MSG_F_BLOCK);
    if (unlikely(*errp)) {
        if (errnop)
            *errnop = ENOBUFS;
        return NULL;
    }

        // 创建 rd_kakfa_msg_t
    rkm = rd_kafka_msg_new00(rkt, force_partition,
                 msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
                 payload, len, key, keylen, msg_opaque);

        if (timestamp)
                rkm->rkm_timestamp  = timestamp;
        else
                rkm->rkm_timestamp = rd_uclock()/1000;
        rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_CREATE_TIME;

        rkm->rkm_ts_enq = now;

    if (rkt->rkt_conf.message_timeout_ms == 0) {
        rkm->rkm_ts_timeout = INT64_MAX;
    } else {
        rkm->rkm_ts_timeout = now +
            rkt->rkt_conf.message_timeout_ms * 1000;
    }

        /* Call interceptor chain for on_send */
        // on_send拦截器, 对这个rkm->rkm_rkmessage作一些个性化处理
        rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);

        return rkm;
}
  • 创建rd_kafka_msg_t, 并放入选定的topic-partition的队列
代码语言:javascript
复制
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
              int msgflags,
              char *payload, size_t len,
              const void *key, size_t keylen,
              void *msg_opaque) {
    rd_kafka_msg_t *rkm;
    rd_kafka_resp_err_t err;
    int errnox;

        /* Create message */
        // 创建 rd_kafka_msg_t
        rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, 
                                payload, len, key, keylen, msg_opaque,
                                &err, &errnox, 0, rd_clock());
        if (unlikely(!rkm)) {
                /* errno is already set by msg_new() */
        rd_kafka_set_last_error(err, errnox);
                return -1;
        }


        /* Partition the message */
       // 选定topic-parition, 放入队列, 这个函数很重要, 我们会单独讲
    err = rd_kafka_msg_partitioner(rkt, rkm, 1);
    if (likely(!err)) {
        rd_kafka_set_last_error(0, 0);
        return 0;
    }

       // 失败的话, 作清理, 设置error
        /* Interceptor: unroll failing messages by triggering on_ack.. */
        rkm->rkm_err = err;
        rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
                                                 &rkm->rkm_rkmessage);

    /* Handle partitioner failures: it only fails when the application
     * attempts to force a destination partition that does not exist
     * in the cluster.  Note we must clear the RD_KAFKA_MSG_F_FREE
     * flag since our contract says we don't free the payload on
     * failure. */

    rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
    rd_kafka_msg_destroy(rkt->rkt_rk, rkm);

    /* Translate error codes to errnos. */
    if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
        rd_kafka_set_last_error(err, ESRCH);
    else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
        rd_kafka_set_last_error(err, ENOENT);
    else
        rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */

    return -1;
}
struct rd_kafka_msgq_t
  • 所在文件: src/rdkafka_msg.h
  • 其实就是简单封装的rd_kafka_msg_t队列
  • 定义:
代码语言:javascript
复制
typedef struct rd_kafka_msgq_s {
    TAILQ_HEAD(, rd_kafka_msg_s) rkmq_msgs;

        // kafka message 个数
    rd_atomic32_t rkmq_msg_cnt;

        // kafka message总大小
    rd_atomic64_t rkmq_msg_bytes;
} rd_kafka_msgq_t;
  • 合并两个rd_kafka_msgq_t:
代码语言:javascript
复制
void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
                           rd_kafka_msgq_t *src)
  • 使用一个rd_kafka_msgq_t覆盖另一个rd_kafka_msgq_t
代码语言:javascript
复制
void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,
                         rd_kafka_msgq_t *src)
  • 从队列里删除一个rd_kafka_msgq_t
代码语言:javascript
复制
rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
                   rd_kafka_msg_t *rkm,
                   int do_count)
  • 出队列
代码语言:javascript
复制
rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
    rd_kafka_msg_t *rkm;

    if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
        rd_kafka_msgq_deq(rkmq, rkm, 1);

    return rkm;
}
  • 入队列, 插入到队尾
代码语言:javascript
复制
static RD_INLINE RD_UNUSED void rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
                        rd_kafka_msg_t *rkm) {
    TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
    rd_atomic32_add(&rkmq->rkmq_msg_cnt, 1);
    rd_atomic64_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len+rkm->rkm_key_len);
}
  • 扫描队列, 将超时的加入到超时队列
代码语言:javascript
复制
int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
                rd_kafka_msgq_t *timedout,
                rd_ts_t now) {
    rd_kafka_msg_t *rkm, *tmp;
    int cnt = rd_atomic32_get(&timedout->rkmq_msg_cnt);

    /* Assume messages are added in time sequencial order */
    TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
        if (likely(rkm->rkm_ts_timeout > now))
            break;

        rd_kafka_msgq_deq(rkmq, rkm, 1);
        rd_kafka_msgq_enq(timedout, rkm);
    }

    return rd_atomic32_get(&timedout->rkmq_msg_cnt) - cnt;
}
  • rd_kafka_msg_partitioner很重要的一个函数, 作两件事: 确定一个topic-partition, 然后把这个rd_kafka_msgq_t放到这个topic-parition对应的队列里
代码语言:javascript
复制
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
                  int do_lock) {
    int32_t partition;
    rd_kafka_toppar_t *rktp_new;
        shptr_rd_kafka_toppar_t *s_rktp_new;
    rd_kafka_resp_err_t err;

    if (do_lock)
        rd_kafka_topic_rdlock(rkt);

       // 根据这个topic当前的状态, 分别作处理
        switch (rkt->rkt_state)
        {
        case RD_KAFKA_TOPIC_S_UNKNOWN:
                /* No metadata received from cluster yet.
                 * Put message in UA partition and re-run partitioner when
                 * cluster comes up. */
        partition = RD_KAFKA_PARTITION_UA;
                break;

        case RD_KAFKA_TOPIC_S_NOTEXISTS:
                /* Topic not found in cluster.
                 * Fail message immediately. */
                err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
        if (do_lock)
            rd_kafka_topic_rdunlock(rkt);
                return err;

        case RD_KAFKA_TOPIC_S_EXISTS:
                /* Topic exists in cluster. */

                /* Topic exists but has no partitions.
                 * This is usually an transient state following the
                 * auto-creation of a topic. */
                if (unlikely(rkt->rkt_partition_cnt == 0)) {
                        partition = RD_KAFKA_PARTITION_UA;
                        break;
                }

                /* Partition not assigned, run partitioner. */
                // 如果rkm->rkm_partition == RD_KAFKA_PARTITION_UA, 调用设转置的partitioner函数来确定一个partition
                if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
                        rd_kafka_topic_t *app_rkt;
                        /* Provide a temporary app_rkt instance to protect
                         * from the case where the application decided to
                         * destroy its topic object prior to delivery completion
                         * (issue #502). */
                        app_rkt = rd_kafka_topic_keep_a(rkt);
                        partition = rkt->rkt_conf.
                                partitioner(app_rkt,
                                            rkm->rkm_key,
                        rkm->rkm_key_len,
                                            rkt->rkt_partition_cnt,
                                            rkt->rkt_conf.opaque,
                                            rkm->rkm_opaque);
                        rd_kafka_topic_destroy0(
                                rd_kafka_topic_a2s(app_rkt));
                } else
                        partition = rkm->rkm_partition;

                /* Check that partition exists. */
                if (partition >= rkt->rkt_partition_cnt) {
                        err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                        if (do_lock)
                                rd_kafka_topic_rdunlock(rkt);
                        return err;
                }
                break;

        default:
                rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
                break;
        }

    /* Get new partition */
    s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);

    if (unlikely(!s_rktp_new)) {
        /* Unknown topic or partition */
        if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
            err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
        else
            err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

        if (do_lock)
            rd_kafka_topic_rdunlock(rkt);

        return  err;
    }

        rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
        rd_atomic64_add(&rktp_new->rktp_c.msgs, 1);

        /* Update message partition */
        if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
                rkm->rkm_partition = partition;

    /* Partition is available: enqueue msg on partition's queue */
        // 塞到partition队列的队尾
    rd_kafka_toppar_enq_msg(rktp_new, rkm);
    if (do_lock)
        rd_kafka_topic_rdunlock(rkt);
    rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */
    return 0;
}

Librdkafka源码分析-Content Table

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.01.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Librdkafka源码分析-Content Table
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档