Librdkafka的基础数据结构 3 -- Buffer相关 2

  • 我们在前面的Librdkafka的基础数据结构 3 -- Buffer相关 1介绍了Buffer和其组成segment,只读映射slice的相关定义和操作;
  • 这一篇我们来介绍librdkafka中rd_buf_t的实际使用, 实际上是通过rd_kafka_buf_s的封装来实现的;
  • 包括:
    1. struct rd_kafka_buf_s
    2. struct rd_kafka_bufq_t

struct rd_kafka_buf_s
  • 所在文件: src/rdkafka_buf.h
  • 这个结构涉及的操作很多, 我会在后面随着代码的深入了解来作补充和更正
  • 定义:
struct rd_kafka_buf_s { /* rd_kafka_buf_t */
    TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link; // 这个rd_kafka_buf_s定义为tailq的元素

    int32_t rkbuf_corrid;  // 对应于kafka协议中request header里的CorrelationId

    rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */ //发送request的重试的绝对时间

    int     rkbuf_flags; /* RD_KAFKA_OP_F */

        rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */  // 发送或接收数据的rd_buf_t
        rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */ // 上面rd_buf_t的只读映射

    int     rkbuf_connid;      /* broker connection id (used when buffer
                    * was partially sent). */
        size_t  rkbuf_totlen;      /* recv: total expected length,
                                    * send: not used */   // 接收response时, kafka协议的前四个字节表示payload长度, 这个表示payload有多长, 即后面需要再接收的数据长度

    rd_crc32_t rkbuf_crc;      /* Current CRC calculation */ //CRC校验

    struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
                                                 * These fields are encoded
                                                 * and written to output buffer
                                                 * on buffer finalization. */
    struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
                                                 * Decoded fields are copied
                                                 * here from the buffer
                                                 * to provide an ease-of-use
                                                 * interface to the header */

    int32_t rkbuf_expected_size;  /* expected size of message */

        // response的入队列
        rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
        rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
                                                 * for retries from inside
                                                 * the rkbuf_cb() callback
                                                 * since rkbuf_replyq will
                                                 * have been reset. */
        rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
        struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */

        struct rd_kafka_broker_s *rkbuf_rkb;  // 相关联的broker

    rd_refcnt_t rkbuf_refcnt; // 引用计数
    void   *rkbuf_opaque;

       // 重试次数
    int     rkbuf_retries;            /* Retries so far. */
#define RD_KAFKA_BUF_NO_RETRIES  1000000  /* Do not retry */

        int     rkbuf_features;   /* Required feature(s) that must be
                                   * supported by broker. */

    rd_ts_t rkbuf_ts_enq;
    rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
                   * after response: RTT. */
    rd_ts_t rkbuf_ts_timeout;

        int64_t rkbuf_offset;     /* Used by OffsetCommit */  // 需要提交的offset

    rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
                    * Used by FetchRequest. */

    rd_kafka_msgq_t rkbuf_msgq;

        rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */

        union {
                struct {
                        rd_list_t *topics;  /* Requested topics (char *) */
                        char *reason;       /* Textual reason */
                        rd_kafka_op_t *rko; /* Originating rko with replyq
                                             * (if any) */
                        int all_topics;     /* Full/All topics requested */

                        int *decr;          /* Decrement this integer by one
                                             * when request is complete:
                                             * typically points to metadata
                                             * cache's full_.._sent.
                                             * Will be performed with
                                             * decr_lock held. */
                        mtx_t *decr_lock;

                } Metadata;
        } rkbuf_u;
};
  • 为kafka request创建rd_kafka_buf_s Kafka request的header在这个函数中被自动附加上
rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
                                          int segcnt, size_t size) {
        rd_kafka_buf_t *rkbuf;

        /* Make room for common protocol request headers */
        // 计算size 更新, 加上request header大小, 包括client id
        size += RD_KAFKAP_REQHDR_SIZE +
                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);

       // rd_buffer_t中的segment个数加上, 包括一个header
        segcnt += 1; /* headers */

       按指定的segment个数和size来创建rd_kafka_buf
        rkbuf = rd_kafka_buf_new0(segcnt, size, 0);

        rkbuf->rkbuf_rkb = rkb;
        rd_kafka_broker_keep(rkb);

        rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;

        // 写kafka request header
        /* Write request header, will be updated later. */
        /* Length: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);
        /* ApiKey */
        rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
        /* ApiVersion: updated later */
        rd_kafka_buf_write_i16(rkbuf, 0);
        /* CorrId: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);

        /* ClientId */
        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);

        return rkbuf;
}
  • 写操作 rd_kafka_buf_write: 调用rd_buf_write来实现
static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
                                        const void *data, size_t len) {
        size_t r;

        r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);

        if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);

        return r;
}
  • 更新buffer中的局部数据rd_kafka_buf_update: 调用rd_buf_write_update来实现
static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
                                          const void *data, size_t len) {
        rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
        rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
}
  • push操作 rd_kafka_buf_push0 调用rd_buf_push来实现, buf的数据不会被copy, 只作简单的指针赋值, rd_buf_push会先看当前现在写入的segment还没有没剩余空间, 如果有的话将剩余空间单拆出来, 生成一个新的segment, 然后再产生一个新的segment来放需要写入的buf, append到rd_kafka_buf_t上, 最后再把拆出来的segment再append
void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
                         int allow_crc_calc, void (*free_cb) (void *)) {
        rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);

        if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
}
  • 针对不同类型的写入操作 rd_kafka_buf_write_*
  • buffer的retry操作, request发送失败可能会重试
int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {

       //先判断是否需要重试
        if (unlikely(!rkb ||
             rkb->rkb_source == RD_KAFKA_INTERNAL ||
             rd_kafka_terminating(rkb->rkb_rk) ||
             rkbuf->rkbuf_retries + 1 >
             rkb->rkb_rk->rk_conf.max_retries))
                return 0;

    /* Try again */
    rkbuf->rkbuf_ts_sent = 0;
    rkbuf->rkbuf_retries++;
    rd_kafka_buf_keep(rkbuf);
        // 加入broker的重试队列里
    rd_kafka_broker_buf_retry(rkb, rkbuf);
    return 1;
}
  • 处理 RD_KAFKA_OP_RECV_BUF类型的buffer(实际上是rd_kafka_op_t中的)
void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
        rd_kafka_buf_t *request, *response;

        request = rko->rko_u.xbuf.rkbuf;
        rko->rko_u.xbuf.rkbuf = NULL;

        /* NULL on op_destroy() */
    if (request->rkbuf_replyq.q) {
        int32_t version = request->rkbuf_replyq.version;
                /* Current queue usage is done, but retain original replyq for
                 * future retries, stealing
                 * the current reference. */
                request->rkbuf_orig_replyq = request->rkbuf_replyq;
                rd_kafka_replyq_clear(&request->rkbuf_replyq);
        /* Callback might need to version check so we retain the
         * version across the clear() call which clears it. */
        request->rkbuf_replyq.version = version;
    }

    if (!request->rkbuf_cb) {
        rd_kafka_buf_destroy(request);
        return;
    }

        /* Let buf_callback() do destroy()s */
        response = request->rkbuf_response; /* May be NULL */
        request->rkbuf_response = NULL;

       // 获取到reqeust和response后触发回调
        rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
                  request->rkbuf_rkb, err,
                              response, request);
}
struct rd_kafka_bufq_t
  • 所在文件: src/rdkafka_buf.h
  • 将上面的rd_kafka_buf_s封装成队列
  • 定义:
typedef struct rd_kafka_bufq_s {
    TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
    rd_atomic32_t  rkbq_cnt;
    rd_atomic32_t  rkbq_msg_cnt;
} rd_kafka_bufq_t;
  • 提供出队,入队, 删除等操作

Librdkafka源码分析-Content Table

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏杨建荣的学习笔记

Oracle升级中的参数补充(r9笔记第2天)

数据库升级的时候有一个很重要的环节就是数据库参数审核,对于数据库参数还是有很多的门道,其中一种就是对于过期参数的处理。 我们可以使用如下的SQL来得到一个基本...

31990
来自专栏IT进修之路

原 JAVA的那些事儿

23070
来自专栏有趣的django

Django+Bootstrap+Mysql 搭建个人博客 (六)

86920
来自专栏星回的实验室

打造自己的MapReduce[二]:Hadoop连接MongoDB

在搭建完Hadoop集群后,我们可以基于HDFS做一些离线计算。然而HDFS毕竟是基于文件的系统,所以当我们存储的数据要兼顾一些线上业务访问的时候(如接入层/推...

21420
来自专栏MasiMaro 的技术博文

PE文件解析器的编写(二)——PE文件头的解析

之前在学习PE文件格式的时候,是通过自己查看各个结构,自己一步步计算各个成员在结构中的偏移,然后在计算出其在文件中的偏移,从而找到各个结构的值,但是在使用C语言...

14820
来自专栏抠抠空间

Flask之基本使用与配置

Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于...

10920
来自专栏菩提树下的杨过

Oracle中使用Entity Framework 6.x Code-First方式开发

去年写过一篇EF的简单学习笔记,当时EF还不支持Oracle的Code-First开发模式,今天无意又看了下Oracle官网,发现EF6.X已经支持了,并且给出...

28550
来自专栏技术小讲堂

Entity Framework4.3 Code-First基于代码的数据迁移讲解1.建立一个最初的模型和数据库   2.启动Migration(数据迁移)3.第一个数据迁移4.订制的数据迁移4.动态

前段时间一直在研究Entity Framework4,但是苦于没有找到我特别中意的教程,要么就是千篇一律的文章,而且写的特别简单,可以说,糟践了微软这么牛埃克斯...

35480
来自专栏Ryan Miao

Java Web基础入门

前言 语言都是相通的,只要搞清楚概念后就可以编写代码了。而概念是需要学习成本的。 Java基础 不用看《编程思想》,基础语法看 http://www.runo...

43170
来自专栏码匠的流水账

聊聊hikari连接池的idleTimeout及minimumIdle属性

本文主要研究一个hikari连接池的idleTimeout及minimumIdle属性

41510

扫码关注云+社区

领取腾讯云代金券