前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Librdkafka的基础数据结构 3 -- Buffer相关 2

Librdkafka的基础数据结构 3 -- Buffer相关 2

作者头像
扫帚的影子
发布2018-09-05 16:44:38
6960
发布2018-09-05 16:44:38
举报
  1. struct rd_kafka_buf_s
  2. struct rd_kafka_bufq_t

struct rd_kafka_buf_s
  • 所在文件: src/rdkafka_buf.h
  • 这个结构涉及的操作很多, 我会在后面随着代码的深入了解来作补充和更正
  • 定义:
代码语言:javascript
复制
struct rd_kafka_buf_s { /* rd_kafka_buf_t */
    TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link; // 这个rd_kafka_buf_s定义为tailq的元素

    int32_t rkbuf_corrid;  // 对应于kafka协议中request header里的CorrelationId

    rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */ //发送request的重试的绝对时间

    int     rkbuf_flags; /* RD_KAFKA_OP_F */

        rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */  // 发送或接收数据的rd_buf_t
        rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */ // 上面rd_buf_t的只读映射

    int     rkbuf_connid;      /* broker connection id (used when buffer
                    * was partially sent). */
        size_t  rkbuf_totlen;      /* recv: total expected length,
                                    * send: not used */   // 接收response时, kafka协议的前四个字节表示payload长度, 这个表示payload有多长, 即后面需要再接收的数据长度

    rd_crc32_t rkbuf_crc;      /* Current CRC calculation */ //CRC校验

    struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
                                                 * These fields are encoded
                                                 * and written to output buffer
                                                 * on buffer finalization. */
    struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
                                                 * Decoded fields are copied
                                                 * here from the buffer
                                                 * to provide an ease-of-use
                                                 * interface to the header */

    int32_t rkbuf_expected_size;  /* expected size of message */

        // response的入队列
        rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
        rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
                                                 * for retries from inside
                                                 * the rkbuf_cb() callback
                                                 * since rkbuf_replyq will
                                                 * have been reset. */
        rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
        struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */

        struct rd_kafka_broker_s *rkbuf_rkb;  // 相关联的broker

    rd_refcnt_t rkbuf_refcnt; // 引用计数
    void   *rkbuf_opaque;

       // 重试次数
    int     rkbuf_retries;            /* Retries so far. */
#define RD_KAFKA_BUF_NO_RETRIES  1000000  /* Do not retry */

        int     rkbuf_features;   /* Required feature(s) that must be
                                   * supported by broker. */

    rd_ts_t rkbuf_ts_enq;
    rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
                   * after response: RTT. */
    rd_ts_t rkbuf_ts_timeout;

        int64_t rkbuf_offset;     /* Used by OffsetCommit */  // 需要提交的offset

    rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
                    * Used by FetchRequest. */

    rd_kafka_msgq_t rkbuf_msgq;

        rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */

        union {
                struct {
                        rd_list_t *topics;  /* Requested topics (char *) */
                        char *reason;       /* Textual reason */
                        rd_kafka_op_t *rko; /* Originating rko with replyq
                                             * (if any) */
                        int all_topics;     /* Full/All topics requested */

                        int *decr;          /* Decrement this integer by one
                                             * when request is complete:
                                             * typically points to metadata
                                             * cache's full_.._sent.
                                             * Will be performed with
                                             * decr_lock held. */
                        mtx_t *decr_lock;

                } Metadata;
        } rkbuf_u;
};
  • 为kafka request创建rd_kafka_buf_s Kafka request的header在这个函数中被自动附加上
代码语言:javascript
复制
rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
                                          int segcnt, size_t size) {
        rd_kafka_buf_t *rkbuf;

        /* Make room for common protocol request headers */
        // 计算size 更新, 加上request header大小, 包括client id
        size += RD_KAFKAP_REQHDR_SIZE +
                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);

       // rd_buffer_t中的segment个数加上, 包括一个header
        segcnt += 1; /* headers */

       按指定的segment个数和size来创建rd_kafka_buf
        rkbuf = rd_kafka_buf_new0(segcnt, size, 0);

        rkbuf->rkbuf_rkb = rkb;
        rd_kafka_broker_keep(rkb);

        rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;

        // 写kafka request header
        /* Write request header, will be updated later. */
        /* Length: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);
        /* ApiKey */
        rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
        /* ApiVersion: updated later */
        rd_kafka_buf_write_i16(rkbuf, 0);
        /* CorrId: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);

        /* ClientId */
        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);

        return rkbuf;
}
  • 写操作 rd_kafka_buf_write: 调用rd_buf_write来实现
代码语言:javascript
复制
static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
                                        const void *data, size_t len) {
        size_t r;

        r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);

        if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);

        return r;
}
  • 更新buffer中的局部数据rd_kafka_buf_update: 调用rd_buf_write_update来实现
代码语言:javascript
复制
static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
                                          const void *data, size_t len) {
        rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
        rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
}
  • push操作 rd_kafka_buf_push0 调用rd_buf_push来实现, buf的数据不会被copy, 只作简单的指针赋值, rd_buf_push会先看当前现在写入的segment还没有没剩余空间, 如果有的话将剩余空间单拆出来, 生成一个新的segment, 然后再产生一个新的segment来放需要写入的buf, append到rd_kafka_buf_t上, 最后再把拆出来的segment再append
代码语言:javascript
复制
void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
                         int allow_crc_calc, void (*free_cb) (void *)) {
        rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);

        if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
}
  • 针对不同类型的写入操作 rd_kafka_buf_write_*
  • buffer的retry操作, request发送失败可能会重试
代码语言:javascript
复制
int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {

       //先判断是否需要重试
        if (unlikely(!rkb ||
             rkb->rkb_source == RD_KAFKA_INTERNAL ||
             rd_kafka_terminating(rkb->rkb_rk) ||
             rkbuf->rkbuf_retries + 1 >
             rkb->rkb_rk->rk_conf.max_retries))
                return 0;

    /* Try again */
    rkbuf->rkbuf_ts_sent = 0;
    rkbuf->rkbuf_retries++;
    rd_kafka_buf_keep(rkbuf);
        // 加入broker的重试队列里
    rd_kafka_broker_buf_retry(rkb, rkbuf);
    return 1;
}
  • 处理 RD_KAFKA_OP_RECV_BUF类型的buffer(实际上是rd_kafka_op_t中的)
代码语言:javascript
复制
void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
        rd_kafka_buf_t *request, *response;

        request = rko->rko_u.xbuf.rkbuf;
        rko->rko_u.xbuf.rkbuf = NULL;

        /* NULL on op_destroy() */
    if (request->rkbuf_replyq.q) {
        int32_t version = request->rkbuf_replyq.version;
                /* Current queue usage is done, but retain original replyq for
                 * future retries, stealing
                 * the current reference. */
                request->rkbuf_orig_replyq = request->rkbuf_replyq;
                rd_kafka_replyq_clear(&request->rkbuf_replyq);
        /* Callback might need to version check so we retain the
         * version across the clear() call which clears it. */
        request->rkbuf_replyq.version = version;
    }

    if (!request->rkbuf_cb) {
        rd_kafka_buf_destroy(request);
        return;
    }

        /* Let buf_callback() do destroy()s */
        response = request->rkbuf_response; /* May be NULL */
        request->rkbuf_response = NULL;

       // 获取到reqeust和response后触发回调
        rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
                  request->rkbuf_rkb, err,
                              response, request);
}
struct rd_kafka_bufq_t
  • 所在文件: src/rdkafka_buf.h
  • 将上面的rd_kafka_buf_s封装成队列
  • 定义:
代码语言:javascript
复制
typedef struct rd_kafka_bufq_s {
    TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
    rd_atomic32_t  rkbq_cnt;
    rd_atomic32_t  rkbq_msg_cnt;
} rd_kafka_bufq_t;
  • 提供出队,入队, 删除等操作

Librdkafka源码分析-Content Table

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.01.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Librdkafka源码分析-Content Table
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档