前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Librdkafka的基础数据结构 3 -- Buffer相关 1

Librdkafka的基础数据结构 3 -- Buffer相关 1

作者头像
扫帚的影子
发布2018-09-05 16:48:41
7920
发布2018-09-05 16:48:41
举报
  • Temporary buffer
  • Buffer segment
  • Buffer
  • Buffer slice

临时写入缓冲区
  • 所在文件: src/rdkafka_buf.h(c)
  • 写操作缓冲区, 写入时保证了8位内存对齐, 以便提高内存读取效率和跨平台的安全性;
  • 定义:
代码语言:javascript
复制
typedef struct rd_tmpabuf_s {
    size_t size; //buf即内部缓冲区容量
    size_t of; //当前写入位置
    char  *buf; //内部缓冲区
    int    failed; //写入是否发生了错误
    int    assert_on_fail; //写入发生错误时,是否assert
} rd_tmpabuf_t;
  • 初始化:
代码语言:javascript
复制
static RD_UNUSED void
rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
    tab->buf = rd_malloc(size); //分配内部缓冲区
    tab->size = size; //buf即内部缓冲区容量
    tab->of = 0;
    tab->failed = 0;
    tab->assert_on_fail = assert_on_fail;
}
  • 根据写入大小返回可供写入的内存位置:
代码语言:javascript
复制
static RD_UNUSED void *
rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
    void *ptr;

    if (unlikely(tab->failed)) //发错标志已设置, 则直接返回NULL
        return NULL;

    if (unlikely(tab->of + size > tab->size)) { //请求的内存size超出缓冲区容量, 作错误处理
        if (tab->assert_on_fail) {
            fprintf(stderr,
                "%s: %s:%d: requested size %zd + %zd > %zd\n",
                __FUNCTION__, func, line, tab->of, size,
                tab->size);
            assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
        }
        return NULL;
    }

        ptr = (void *)(tab->buf + tab->of); //获取可供写入的内存位转置:
    tab->of += RD_ROUNDUP(size, 8); //更新当前写入位置, 并作8位对齐

    return ptr;
}
  • buffer写入:
代码语言:javascript
复制
static RD_UNUSED void *
rd_tmpabuf_write0 (const char *func, int line,
           rd_tmpabuf_t *tab, const void *buf, size_t size) {
    void *ptr = rd_tmpabuf_alloc0(func, line, tab, size); //根据写入大小返回可供写入的内存位置

    if (ptr)
        memcpy(ptr, buf, size); //内存copy

    return ptr;
}
Buffer segment
  • 所在文件: src/rdbuf.c(h)
  • 顾名思义, buffer segmengbuffer的组成部分, buffer是我们下一节要介绍的rd_buf_t;
  • 定义:
代码语言:javascript
复制
typedef struct rd_segment_s {
        TAILQ_ENTRY(rd_segment_s)  seg_link; /*<< rbuf_segments Link */ tailq元素
        char  *seg_p;                /**< Backing-store memory */ 用于存储数据的内存指针
        size_t seg_of;               /**< Current relative write-position 在当前的segment中的写入位置
                                      *   (length of payload in this segment) */
        size_t seg_size;             /**< Allocated size of seg_p */ 存储数据的内存大小
        size_t seg_absof;            /**< Absolute offset of this segment's
                                      *   beginning in the grand rd_buf_t */ 当前segment 在整个buffer中写入的起始地址
        void (*seg_free) (void *p);  /**< Optional free function for seg_p */
        int    seg_flags;            /**< Segment flags */
#define RD_SEGMENT_F_RDONLY   0x1    /**< Read-only segment */
#define RD_SEGMENT_F_FREE     0x2    /**< Free segment on destroy,
                                      *   e.g, not a fixed segment. */
} rd_segment_t;
  • 初始化:
代码语言:javascript
复制
static void rd_segment_init (rd_segment_t *seg, void *mem, size_t size) {
        memset(seg, 0, sizeof(*seg));
        seg->seg_p     = mem;
        seg->seg_size  = size;
}
  • 销毁:
代码语言:javascript
复制
static void rd_segment_destroy (rd_segment_t *seg) {
        /* Free payload */
        //如果提供了seg_free, 也destroy seg-
        if (seg->seg_free && seg->seg_p)
                seg->seg_free(seg->seg_p);

        if (seg->seg_flags & RD_SEGMENT_F_FREE)
                rd_free(seg);
}
  • 判断当前segment是否还有剩余空间, 并返回当前写指针, 返回值表示当前还可写入的大小
代码语言:javascript
复制
static RD_INLINE RD_UNUSED size_t
rd_segment_write_remains (const rd_segment_t *seg, void **p) {
        if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY)))
                return 0;
        if (p)
                *p = (void *)(seg->seg_p + seg->seg_of);
        return seg->seg_size - seg->seg_of;
}
主角 Buffter
  • 所在文件: src/rdbuf.c(h)
  • 数据Buffer, 内包一个rd_segment_t的list
  • 定义:
代码语言:javascript
复制
typedef struct rd_buf_s {
        struct rd_segment_head rbuf_segments; /**< TAILQ list of segments */ segment tailq的头指针
        size_t            rbuf_segment_cnt;   /**< Number of segments */ segment个数

        rd_segment_t     *rbuf_wpos;          /**< Current write position seg */ 当前正在写入的segment
        size_t            rbuf_len;           /**< Current (written) length */ 当前写入数据的总大小
        size_t            rbuf_size;          /**< Total allocated size of 
                                               *   all segments. */所有segment的存储空间大小

        char             *rbuf_extra;         /* Extra memory allocated for
                                               * use by segment structs,
                                               * buffer memory, etc. */ 可以理解成一个预分配的内存池
        size_t            rbuf_extra_len;     /* Current extra memory used */
        size_t            rbuf_extra_size;    /* Total size of extra memory */
} rd_buf_t;
  • 确保当前写入的segment可以写入size大小的数据:
代码语言:javascript
复制
void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size) {
        rd_segment_t *seg = rbuf->rbuf_wpos;

        if (seg) {
                void *p;
                size_t remains = rd_segment_write_remains(seg, &p);

                // 如果当前正在写入的segment足够的剩余空间,那么就写入当前这个segment
                if (remains >= size)
                        return; /* Existing segment has enough space. */

                /* Future optimization:
                 * If existing segment has enough remaining space to warrant
                 * a split, do it, before allocating a new one. */
        }

        /* Allocate new segment */
        // 分配新的segment作为新的当前写入的segment
        rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size);
}
  • 根据指定的offset来获取其所在的segment:
代码语言:javascript
复制
rd_segment_t *
rd_buf_get_segment_at_offset (const rd_buf_t *rbuf, const rd_segment_t *hint,
                              size_t absof) {
        // hint是为了少遍历segment list, 给定一个offset可以命中的segment
        const rd_segment_t *seg = hint;

        if (unlikely(absof > rbuf->rbuf_len))
                return NULL;

        /* Only use current write position if possible and if it helps */
        if (!seg || absof < seg->seg_absof)
                seg = TAILQ_FIRST(&rbuf->rbuf_segments);

        // 遍历查找
        do {
                if (absof >= seg->seg_absof &&
                    absof < seg->seg_absof + seg->seg_of) {
                        rd_dassert(seg->seg_absof <= rd_buf_len(rbuf));
                        return (rd_segment_t *)seg;
                }
        } while ((seg = TAILQ_NEXT(seg, seg_link)));

        return NULL;
}
  • 将segment按指定的offset分离成一个单独的segment, 这是一个内部函数,按源码里的注释目前只用于最后一个segment的空闲存储空间分离
代码语言:javascript
复制
static rd_segment_t *rd_segment_split (rd_buf_t *rbuf, rd_segment_t *seg,
                                       size_t absof) {
        rd_segment_t *newseg;
        size_t relof;

        rd_assert(seg == rbuf->rbuf_wpos);
        rd_assert(absof >= seg->seg_absof &&
                  absof <= seg->seg_absof + seg->seg_of);

       // 确定在segment中的分离位置
        relof = absof - seg->seg_absof;

      //  创建新的segment
        newseg = rd_buf_alloc_segment0(rbuf, 0);

        /* Add later part of split bytes to new segment */
        newseg->seg_p      = seg->seg_p+relof;
        newseg->seg_of     = seg->seg_of-relof;
        newseg->seg_size   = seg->seg_size-relof;
        newseg->seg_absof  = SIZE_MAX; /* Invalid */
        newseg->seg_flags |= seg->seg_flags;

        /* Remove earlier part of split bytes from previous segment */
        // 原有segment更新
        seg->seg_of        = relof;
        seg->seg_size      = relof;

        /* newseg's length will be added to rbuf_len in append_segment(),
         * so shave it off here from seg's perspective. */
        // 分离后不再属于当前buffer, buffer相关值更新
        rbuf->rbuf_len   -= newseg->seg_of;
        rbuf->rbuf_size  -= newseg->seg_size;

        return newseg;
}
  • buffer 初始化:
代码语言:javascript
复制
void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) {
        size_t totalloc = 0;

        // 各成员清0
        memset(rbuf, 0, sizeof(*rbuf));

        // segment list初始化
        TAILQ_INIT(&rbuf->rbuf_segments);

        if (!fixed_seg_cnt) {
                assert(!buf_size);
                return;
        }

        /* Pre-allocate memory for a fixed set of segments that are known
         * before-hand, to minimize the number of extra allocations
         * needed for well-known layouts (such as headers, etc) */
        totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt;

        /* Pre-allocate extra space for the backing buffer. */
        totalloc += buf_size;

        rbuf->rbuf_extra_size = totalloc;

        // 预分配内存
        rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size);
}
  • 获取当前可用于写入数据的内存指针, 返回值是获取的写入指针所在segment的剩余空间
代码语言:javascript
复制
static size_t
rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p) {
        rd_segment_t *seg;

        for (seg = rbuf->rbuf_wpos ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
                size_t len = rd_segment_write_remains(seg, p);

                /* Even though the write offset hasn't changed we
                 * avoid future segment scans by adjusting the
                 * wpos here to the first writable segment. */
               // 更新buffer的rbuf_wpos
                rbuf->rbuf_wpos = seg;
                if (segp)
                        *segp = seg;

                if (unlikely(len == 0))
                        continue;

                /* Also adjust absof if the segment was allocated
                 * before the previous segment's memory was exhausted
                 * and thus now might have a lower absolute offset
                 * than the previos segment's now higher relative offset. */
                // 更新获取的segment的seg_absof
                if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len)
                        seg->seg_absof = rbuf->rbuf_len;

                return len;
        }

        return 0;
}
  • 写payload到buffer:
代码语言:javascript
复制
size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size) {
        size_t remains = size;
        size_t initial_absof;
        const char *psrc = (const char *)payload;

        initial_absof = rbuf->rbuf_len;

        /* Ensure enough space by pre-allocating segments. */
        rd_buf_write_ensure(rbuf, size, 0);

       // 循环写,直到payload全部写入
        while (remains > 0) {
                void *p;
                rd_segment_t *seg;
                // 获取当前可以用于写入的segment, 这个segment对应的内存指针和剩余空间
                size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p);
                size_t wlen = RD_MIN(remains, segremains);

                rd_dassert(seg == rbuf->rbuf_wpos);
                rd_dassert(wlen > 0);
                rd_dassert(seg->seg_p+seg->seg_of <= (char *)p &&
                           (char *)p < seg->seg_p+seg->seg_size);

                if (payload) {
                        // payload内存copy
                        memcpy(p, psrc, wlen);
                        psrc += wlen;
                }

                // 更新各offset
                seg->seg_of    += wlen;
                rbuf->rbuf_len += wlen;
                remains        -= wlen;
        }

        rd_assert(remains == 0);

        return initial_absof;
}
  • 从指定offset开始局部更新buffer
代码语言:javascript
复制
size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof,
                            const void *payload, size_t size) {
        rd_segment_t *seg;
        const char *psrc = (const char *)payload;
        size_t of;

        /* Find segment for offset */
       // 根据指定的offsewt找到需要更新的segment
        seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
        rd_assert(seg && *"invalid absolute offset");

        // 可能需要更新多个连续的segment
        for (of = 0 ; of < size ; seg = TAILQ_NEXT(seg, seg_link)) {
                rd_assert(seg->seg_absof <= rd_buf_len(rbuf));
                // 更新
                size_t wlen = rd_segment_write_update(seg, absof+of,
                                                      psrc+of, size-of);
                of += wlen;
        }

        rd_dassert(of == size);

        return of;
}
  • buffer的push操作
代码语言:javascript
复制
void rd_buf_push (rd_buf_t *rbuf, const void *payload, size_t size,
                  void (*free_cb)(void *)) {
        rd_segment_t *prevseg, *seg, *tailseg = NULL;

       // push操作前需要将当前正在写入的segment的剩余空间分离为一个独立的segment
        if ((prevseg = rbuf->rbuf_wpos) &&
            rd_segment_write_remains(prevseg, NULL) > 0) {
                /* If the current segment still has room in it split it
                 * and insert the pushed segment in the middle (below). */
                tailseg = rd_segment_split(rbuf, prevseg,
                                           prevseg->seg_absof +
                                           prevseg->seg_of);
        }

        // 创建一个新的segment, 用payload初始化,然后apeend到segment list
        seg = rd_buf_alloc_segment0(rbuf, 0);
        seg->seg_p      = (char *)payload;
        seg->seg_size   = size;
        seg->seg_of     = size;
        seg->seg_free   = free_cb;
        seg->seg_flags |= RD_SEGMENT_F_RDONLY;

        rd_buf_append_segment(rbuf, seg);

        // 将上面分离出来的独立的segment添加到segment list的末尾
        if (tailseg)
                rd_buf_append_segment(rbuf, tailseg);
}
  • buffer的seek操作, 其实就是一个truncat操作
代码语言:javascript
复制
int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof) {
        rd_segment_t *seg, *next;
        size_t relof;

       // 定位到需要操作的起始segment
        seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
        if (unlikely(!seg))
                return -1;

        relof = absof - seg->seg_absof;
        if (unlikely(relof > seg->seg_of))
                return -1;

        /* Destroy sub-sequent segments in reverse order so that
         * destroy_segment() length checks are correct.
         * Will decrement rbuf_len et.al. */
        // 截断操作,destroy掉不再需要的segment
        for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head) ;
             next != seg ; next = TAILQ_PREV(next, rd_segment_head, seg_link))
                rd_buf_destroy_segment(rbuf, next);

        /* Update relative write offset */
        seg->seg_of         = relof;
        rbuf->rbuf_wpos     = seg;
        rbuf->rbuf_len      = seg->seg_absof + seg->seg_of;

        rd_assert(rbuf->rbuf_len == absof);

        return 0;
}
Buffer slice
  • 所在文件: src/rdkafka_buf.h(c)
  • 顾名思义, 表示上面介绍的 rd_buffer_t的一个片断, 只读, 不会修改buffer的内容;
  • 定义:
代码语言:javascript
复制
typedef struct rd_slice_s {
        const rd_buf_t     *buf;    /**< Pointer to buffer */ 映射的rd_buffer_t
        const rd_segment_t *seg;    /**< Current read position segment.
                                     *   Will point to NULL when end of
                                     *   slice is reached. */ 当前正在读取的rd_buffer_t中的segment
        size_t              rof;    /**< Relative read offset in segment */  当前正在读取的rd_buffer_t中的segment的读offset
        size_t              start;  /**< Slice start offset in buffer */
        size_t              end;    /**< Slice end offset in buffer+1 */
} rd_slice_t;
  • 用segment来初始化slice
代码语言:javascript
复制
int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf,
                           const rd_segment_t *seg, size_t rof, size_t size) {
        /* Verify that \p size bytes are indeed available in the buffer. */
       // 如果需要读取的数据大小已超出buffer写入的数据量, 则返回-1
        if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size)))
                return -1;

        // 初始化过程
        slice->buf    = rbuf;
        slice->seg    = seg;
        slice->rof    = rof;  // 这个地方rof应该判断一下是否小于segment的seg_of
        slice->start  = seg->seg_absof + rof;
        slice->end    = slice->start + size;

        rd_assert(seg->seg_absof+rof >= slice->start &&
                  seg->seg_absof+rof <= slice->end);

        rd_assert(slice->end <= rd_buf_len(rbuf));

        return 0;
}
  • 按指定的offset来初始化slice
代码语言:javascript
复制
int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf,
                   size_t absof, size_t size) {
        //根据offset来定位到对应buffer中的segment
        const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL,
                                                               absof);
        if (unlikely(!seg))
                return -1;

        return rd_slice_init_seg(slice, rbuf, seg,
                                 absof - seg->seg_absof, size);
}
  • 将整个buffer都映射到slice
代码语言:javascript
复制
void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf) {
        int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf));
        rd_assert(r == 0);
}
  • slice的读操作, 这个函数相当于一个slice的iterator遍历接口, 遍历这个slice对应的每一个segment
代码语言:javascript
复制
size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) {
        size_t rof = slice->rof;
        size_t rlen;
        const rd_segment_t *seg;

        /* Find segment with non-zero payload */
        //  获取当前要读取的segment
        for (seg = slice->seg ;
             seg && seg->seg_absof+rof < slice->end && seg->seg_of == rof ;
              seg = TAILQ_NEXT(seg, seg_link))
                rof = 0;

        if (unlikely(!seg || seg->seg_absof+rof >= slice->end))
                return 0;

        rd_assert(seg->seg_absof+rof <= slice->end);


        *p   = (const void *)(seg->seg_p + rof);
        rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice));

        // 更新 slice后当前操作的segment以及rof
        if (update_pos) {
                if (slice->seg != seg) {
                        rd_assert(seg->seg_absof + rof >= slice->start &&
                                  seg->seg_absof + rof+rlen <= slice->end);
                        slice->seg  = seg;
                        slice->rof  = rlen;
                } else {
                        slice->rof += rlen;
                }
        }

        return rlen;
}
  • 读slice到内存buffer
代码语言:javascript
复制
size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size) {
        size_t remains = size;
        char *d = (char *)dst; /* Possibly NULL */
        size_t rlen;
        const void *p;
        size_t orig_end = slice->end;

        if (unlikely(rd_slice_remains(slice) < size))
                return 0;

        /* Temporarily shrink slice to offset + \p size */
        slice->end = rd_slice_abs_offset(slice) + size;

       // 使用 rd_slice_reader来作iterator式的遍历
        while ((rlen = rd_slice_reader(slice, &p))) {
                rd_dassert(remains >= rlen);
                if (dst) {
                        // 内存copy
                        memcpy(d, p, rlen);
                        d       += rlen;
                }
                remains -= rlen;
        }

        rd_dassert(remains == 0);

        /* Restore original size */
        slice->end = orig_end;

        return size;
}
  • slice的seek操作
代码语言:javascript
复制
int rd_slice_seek (rd_slice_t *slice, size_t offset) {
        const rd_segment_t *seg;
        size_t absof = slice->start + offset;

        if (unlikely(absof >= slice->end))
                return -1;

        // 根据offset来定位segment
        seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof);
        rd_assert(seg);

        // 更新slice
        slice->seg = seg;
        slice->rof = absof - seg->seg_absof;
        rd_assert(seg->seg_absof + slice->rof >= slice->start &&
                  seg->seg_absof + slice->rof <= slice->end);

        return 0;
}

Librdkafka源码分析-Content Table

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.01.02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Librdkafka源码分析-Content Table
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档