前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DAOS_分布式存储_大块数据传输_RDMA_BULK_单边读或写_大IO数据通道_RPC大小IO阈值19K_源码分析

DAOS_分布式存储_大块数据传输_RDMA_BULK_单边读或写_大IO数据通道_RPC大小IO阈值19K_源码分析

原创
作者头像
ssbandjl
修改2023-12-14 07:54:33
2650
修改2023-12-14 07:54:33
举报
文章被收录于专栏:daosdaos

DAOS大块数据传输(RDMA单边读/写)流程要点

1. 以设置/获取池属性为例(dmg pool list -v)

2. 发送端把一段不连续的内存封装为sgl, 调用bulk_create分段注册好(crt_bulk_create(ctx, &sgl, CRT_BULK_RW, bulk))

3. 封装RPC发送给服务端(引擎), 参考(daos_rpc_send)

4. 引擎收到RPC后, 根据数据长度, 准备大块内存接收客户端的数据(crt_bulk_get_len, crt_bulk_create)

5. 执行大块数据传输(crt_bulk_transfer), 用RDMA的单边读语义, 将数据拉取(DMA)到引擎指定的数据块

6. 释放资源

源码流程分析

代码语言:javascript
复制
设置/获取池属性 -> src/tests/suite/daos_pool.c -> { "POOL7: set/get/list user-defined pool attributes (sync)" -> pool_attribute
  daos_pool_list_attr(poh, NULL, &total_size, arg->async ? &ev : NULL) -> 查询池属性
    dc_task_create(dc_pool_list_attr, NULL, ev, &task)
    args->buf	= buf
    args->size	= size -> 0
    dc_task_schedule(task, true)
...    
dc_pool_list_attr(tse_task_t *task)
  pool_req_prepare(args->poh, POOL_ATTR_LIST, daos_task2ctx(task), &cb_args); -> 准备RPC请求
    opcode = DAOS_RPC_OPCODE -> 反解析OPC
    crt_req_create(crt_ctx, tgt_ep, opcode, req)
  crt_bulk_create(daos_task2ctx(task), &sgl, CRT_BULK_RW, &in->pali_bulk);
    crt_hg_bulk_create(&ctx->cc_hg_ctx, sgl, bulk_perm, bulk_hdl)
      buf_sizes[i] = sgl->sg_iovs[i].iov_buf_len
      buf_ptrs[i] = sgl->sg_iovs[i].iov_buf
      hg_return_t HG_Bulk_create(hg_class_t *hg_class, hg_uint32_t count, void **buf_ptrs,const hg_size_t *buf_sizes, hg_uint8_t flags, hg_bulk_t *handle)
      "Creating new bulk handle with %u segment(s)", count) -> 打印有多少分段
  cb_args.pra_bulk = in->pali_bulk -> 设置bulk
  return daos_rpc_send(cb_args.pra_rpc, task) -> 发送RPC到服务端
...
ds_pool_attr_list_handler(crt_rpc_t *rpc) -> 服务端收到RPC
  ds_rsvc_list_attr(&svc->ps_rsvc, &tx, &svc->ps_user,in->pali_bulk, rpc, &out->palo_size) -> 拿到池属性列表参数上的bulk(客户端已注册), 服务端解释为远端bulk: remote_bulk
  rc = crt_bulk_get_len(remote_bulk, &bulk_size)
  d_sg_list_t	 sgl -> 将服务端属性设置到sgl上
  rc = crt_bulk_create(rpc->cr_ctx, &sgl, CRT_BULK_RW, &local_bulk); -> 创建本地BULK, 放入sgl内容
  attr_bulk_transfer(rpc, CRT_BULK_PUT, local_bulk, remote_bulk, 0, 0, bulk_size - iter_args.available) -> 服务端调用RDMA写操作将数据DMA给客户端
    crt_bulk_transfer(&bulk_desc, bulk_cb, &eventual, NULL)
  crt_bulk_free(local_bulk) -> 释放服务端的BULK




iv_op_ult(void *arg)
  crt_iv_update_internal
    crt_ivsync_rpc_issue
      crt_bulk_create


dmg pool list -v
func (cmd *poolQueryCmd) Execute(_ []string)
C.daos_pool_query(cmd.cPoolHandle, rlPtr, &pinfo, nil, nil)
dc_pool_query(tse_task_t *task)
  map_bulk_create(daos_task2ctx(task), &in->pqi_map_bulk, &map_buf, pool_buf_nr(pool->dp_map_sz)) -> 池创建bulk
    d_iov_set(&iov, *buf, pool_buf_size((*buf)->pb_nr));
    sgl.sg_nr = 1;
    sgl.sg_nr_out = 0;
    sgl.sg_iovs = &iov;
    rc = crt_bulk_create(ctx, &sgl, CRT_BULK_RW, bulk);
  return daos_rpc_send(rpc, task) 发送RPC
客户端
---------------------------
服务端
ds_pool_query_handler_v5 <- POOL_QUERY -> ds_pool_query_handler(rpc, 5)
ds_pool_query_handler(crt_rpc_t *rpc, int version)
  pool_svc_lookup_leader
  ds_rebuild_query
  rdb_tx_begin(svc->ps_rsvc.s_db, svc->ps_rsvc.s_term, &tx)
  daos_rpc_from_client(crt_rpc_t *rpc)
    crt_req_src_rank_get(rpc, &srcrank)
  pool_prop_read(&tx, svc, DAOS_PO_QUERY_PROP_GLOBAL_VERSION, &prop);
  daos_prop_entry_get
  rc = read_map_buf(&tx, &svc->ps_root, &map_buf, &map_version) -> 读取到map_buf(池映射)
    rc = locate_map_buf(tx, kvs, &b, version)
    size = pool_buf_size(b->pb_nr)
    memcpy(*buf, b, size)
  ds_pool_transfer_map_buf(map_buf, map_version, rpc, in->pqi_map_bulk, &out->pqo_map_buf_size)
    crt_bulk_get_len(remote_bulk, &remote_bulk_size) -> remote_bulk_size = 4096, map_buf_size=128
    d_iov_set(&map_iov, map_buf, map_buf_size) -> 
    map_sgl.sg_nr = 1;
    map_sgl.sg_nr_out = 0;
    map_sgl.sg_iovs = &map_iov;
    crt_bulk_create(rpc->cr_ctx, &map_sgl, CRT_BULK_RO, &bulk) -> 用服务端的map_sgl数据, 服务端创建只读本地BULK
      ...
      hg_bulk = (struct hg_bulk *) calloc(1, sizeof(*hg_bulk))
      hg_bulk->desc.info.segment_count = count -> 1段
      segments = hg_bulk->desc.segments.s
      segments[i].base = (hg_ptr_t) bufs[i] -> 140039780584816
      segments[i].len = lens[i] -> 128
      hg_bulk->desc.info.len += lens[i]
      hg_bulk_create_na_mem_descs(&hg_bulk->na_mem_descs, na_class,
            segments, count, flags, (enum na_mem_type) attrs->mem_type,
            attrs->device)
        na_mem_handles = na_mem_descs->handles.s
        hg_bulk_register(na_class, (void *) segments[i].base,
            segments[i].len, flags, mem_type, device, &na_mem_handles[i],
            &na_mem_serialize_sizes[i])
        ...
    map_desc.bd_bulk_op = CRT_BULK_PUT -> 设置为服务端通过RDMA写操作(wr.opcode = IBV_WR_RDMA_WRITE), 将池map, DMA给客户端
    map_desc.bd_remote_hdl = remote_bulk
    map_desc.bd_local_hdl = bulk
    crt_bulk_transfer(&map_desc, bulk_cb, &eventual, &map_opid) -> 传输BULK, 在: rc = bulk_cbinfo->bci_cb(&crt_bulk_cbinfo) 中执行回调

客户端发送RPC/BULK的阈值(19KB)

1. 在客户端更新对象(写/改对象)的任务函数中(dc_obj_update_task)

2. 如果所有SGL总长度达到19K(1k留给header), 或者对象是EC模式且不是对单个tgt下发的EC_SGL重组请求, 则需要使用BULK传输, 参考: if (sgls_size >= DAOS_BULK_LIMIT || (obj_is_ec(obj) && !obj_auxi->reasb_req.orr_single_tgt))

参考源码:

代码语言:javascript
复制
通过以下对象连接
.cpf_name =
daos_opc_t
dc_funcs[opc].task_func 客户端方法数组

dc_obj_update_task(tse_task_t *task) DAOS_OPC_OBJ_UPDATE 写 -> 客户端更新对象(写/改对象)的任务
  obj_req_valid(task, args, DAOS_OBJ_RPC_UPDATE
    obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) -> 将任务压栈
      pushed_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top
    ...
    dc_io_epoch_set(epoch, opc)
    tse_task_stack_pop -> 将任务从栈上弹出来
      poped_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top
  dc_tx_attach(args->th, obj, DAOS_OBJ_RPC_UPDATE, task) 如果事务有效(hdl.cookie == 1), 则走dtx
  return dc_obj_update(task, &epoch, map_ver, args, obj) -> 提交对象更新
    obj_task_init(task, DAOS_OBJ_RPC_UPDATE, map_ver, args->th, &obj_auxi, obj)
      obj_task_init_common(task, opc, map_ver, th, auxi, obj)
        tse_task_stack_push
        shard_task_list_init(obj_auxi)
        obj_auxi->is_ec_obj = obj_is_ec(obj) -> 设置EC对象标志
      tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) -> 为任务注册对象完成回调, 弹出任务参数, 重试, 错误处理等
      ----------------------
    obj_update_sgls_dup(obj_auxi, args) -> 用户可能提供 iov_len < iov_buf_len 的 sql,这可能会给内部处理带来一些麻烦,例如 crt_bulk_create/daos_iov_left() 总是使用 iov_buf_len。 对于这种情况,我们复制 sql 并使其 iov_buf_len = iov_len
    obj_auxi->dkey_hash = obj_dkey2hash(obj->cob_md.omd_id, args->dkey) -> 比如为1
    if (obj_is_ec(obj)) -> 如果是EC对象(对象类属性上的封装方法为 DAOS_RES_EC ), 则重新组装对象写请求
      obj_rw_req_reassemb(obj, args, NULL, obj_auxi) -> 配置了EC的对象需要重新组装, 对象的读写请求
        struct obj_reasb_req	*reasb_req = &obj_auxi->reasb_req -> EC请求, 重新组装 obj 请求。 用户输入的 iod/sgl 可能需要在发送到服务器之前在客户端重新组装,例如:合并相邻的recx,或者对无序的recx进行排序并生成新的sgl与之匹配; 对于EC obj,将iod/recxs拆分到每个目标,生成新的sgl与之匹配,创建oiod/siod以指定每个shard/tgt的IO req
        if (!obj_auxi->req_reasbed)
          obj_reasb_req_init(&obj_auxi->reasb_req, obj, args->iods, args->nr) -> 创建reasb_req并设置iod的值,从输入iod中重用缓冲区,iod_type / iod_size分配为输入iod,iod_kcsum / iod_nr / iod_recx / iod_csums / iod_eprs数组将设置为0 / NULL
          daos_recx_t -> 记录是任意长度的原子 blob,它总是作为一个整体来获取/更新。 记录的大小可能会随着时间的推移而改变。 记录由以下复合键唯一标识: - 分布键(又名 dkey)表示位于同一存储目标上的一组数组。 dkey 具有任意大小。 - 属性键(又名 akey)区分各个数组。 同样,akey 具有任意大小。 - 数组中的索引区分各个记录。 索引是一个范围从零到无穷大的整数。 一系列索引标识称为范围的连续记录集。 范围内的所有记录必须具有相同的大小。 记录范围是数组内相同大小的连续记录范围。 rx_idx 是该范围的第一个数组索引,rx_nr 是该范围覆盖的记录数
            reasb_req->orr_oca = obj_get_oca(obj)
            size_iod = roundup(sizeof(daos_iod_t) * iod_nr, 8)
            ...    
        obj_ec_req_reasb(obj, args->iods, obj_auxi->dkey_hash, args->sgls, reasb_req, args->nr, obj_auxi->opc == DAOS_OBJ_RPC_UPDATE);
            for (i = 0; i < iod_nr; i++)
                obj_ec_singv_req_reasb
                    ec_recx_array->oer_k = oca->u.ec.e_k
                    ec_recx_array->oer_p = oca->u.ec.e_p
                    if (obj_ec_singv_one_tgt(iod->iod_size, sgl, oca))
                        obj_ec_fail_info_parity_get
                        obj_ec_singv_small_idx
                        obj_ec_set_parity_bitmaps
                        obj_ec_parity_tgt_nr
                    obj_ec_singv_cell_bytes
                    obj_io_desc_init
                    codec = codec_get(reasb_req, obj->cob_md.omd_id) -> isal支持
                        reasb_req->orr_codec = obj_ec_codec_get(daos_obj_id2class(oid))
                            daos_array_find(ecc_array, oc_ec_codec_nr, oc_id, &ecc_sort_ops)
                            daos_array_find(ecc_array, oc_ec_codec_nr, oc_id, &ecc_redun_sort_ops)
                    obj_ec_singv_encode(codec, oca, iod, sgl, ec_recx_array) -> DAOS-7539 EC:自定义 EC 单元大小 (#5832),用户可以通过池或容器属性指定 EC 单元大小,DAOS_PROP_PO_EC_CELL_SZ,设置池的默认 EC 单元大小,DAOS_PROP_CO_EC_CELL_SZ,设置容器的默认 EC 单元大小,如果是 EC 为池和容器都设置了单元格大小,然后容器的值会覆盖池的值。 此补丁将 EC 单元大小从属性应用到客户端 EC 堆栈、服务器 I/O 处理程序、数据迁移服务和 EC 聚合服务。 - EC单元大小应为4K的倍数且小于1MB,默认单元大小仍为1MB,后续应更改修补
                        obj_ec_pbufs_init(recxs, c_bytes)
                        obj_ec_recx_encode(codec, oca, iod, sgl, recxs) -> 编码满条带
                            for (i = 0; i < recx_nr; i++)
                                obj_ec_stripe_encode(iod, sgl, iov_idx, iov_off, codec, oca, cell_bytes, parity_buf)
                                    ec_encode_data(cell_bytes, k, p, codec->ec_gftbls, data, parity_bufs) -> ec编码, isa-l 中 ec_init_tables() 的用途: https://blog.csdn.net/choumin/article/details/126898021
                    d_sgl_init(r_sgl, sgl->sg_nr + obj_ec_parity_tgt_nr(oca)) -> 重组sgl
                    d_iov_set(&r_sgl->sg_iovs[iov_nr + idx]
                    obj_reasb_req_dump(reasb_req, sgl, oca, 0, iod_idx) -> 打印EC调试信息
                obj_ec_recx_scan
                    obj_ec_recx_cell_nr
                    ec_partial_tgt_recx_nrs
                    ec_all_tgt_recx_nrs
                    obj_ec_recov_tgt_recx_nrs
                    obj_ec_recxs_init
                    obj_io_desc_init
                    obj_ec_riod_init
                    obj_ec_seg_sorter_init
                    obj_ec_pbufs_init
                obj_ec_recx_reasb -> 为EC重组iod/sgl/recx, 输入iod, sgl, recx_array, 输出riod, rsgl, oiod
                    recx_with_full_stripe
                    ec_recov_recx_seg_add
                    ec_data_recx_add
                    ec_data_seg_add
                    ec_parity_recx_add
                    ec_parity_seg_add
                    obj_ec_seg_pack
            obj_ec_encode
                obj_ec_recx_encode -> 对全条带recx_array中的数据进行编码,结果奇偶校验存储在struct obj_ec_recx_array::oer_pbufs中
                obj_ec_stripe_encode -> 编码一个完整的条带,结果奇偶校验缓冲区将被填满
    obj_update_shards_get
    obj_shards_2_fwtgts -> 根据分片查找转发的目标
      req_tgts->ort_shard_tgts = req_tgts->ort_tgts_inline -> 分片目标数组,包含 (ort_grp_nr * ort_grp_size) 个目标。 如果#targets <= OBJ_TGT_INLINE_NR 那么它指向ort_tgts_inline。 在数组中,[0, ort_grp_size - 1] 表示第一组,[ort_grp_size, ort_grp_size * 2 - 1] 表示第二组,依此类推。 如果 (ort_srv_disp == 1),则在每个组中,第一个目标是领导分片,后面的 (ort_grp_size - 1) 目标是前向非领导分片。 现在只有一种情况 (ort_grp_nr > 1) 用于对象打孔,所有其他情况均为 (ort_grp_nr == 1)
      obj_shard_tgts_query -> 分片目标查询
    obj_csum_update
    -------------------
    obj_req_get_tgts 获取对象对应的目标
      obj_dkey2grpmemb
        obj_dkey2grpidx
          pool_map_ver = pool_map_get_version(pool->dp_map)
          grp_size = obj_get_grp_size(obj)
          grp_idx = d_hash_jump(hash, obj->cob_shards_nr / grp_size) how hash generate? obj with pool
      obj_shards_2_fwtgts
        obj_shard_tgts_query 分片目标查询
          obj_shard_open
            dc_obj_shard_open
              pool_map_find_target 二分查找
                comp_sorter_find_target(sorter, id)
                  daos_array_find
                    array_bin_search
          obj_shard2tgtid
            *tgt_id = obj->cob_shards->do_shards[shard].do_target_id -> dc_obj_layout 客户端对象布局
          obj_shard_close(obj_shard)
        obj_auxi->flags |= ORF_CONTAIN_LEADER -> 要求转发给容器leader
        obj_grp_leader_get
          pl_select_leader obj_get_shard
            array_bin_search 二分查找 daos_obj_classes
    tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0)
    obj_csum_update(obj, args, obj_auxi)
    obj_rw_bulk_prep(obj, args->iods, args->sgls, args->nr, true, obj_auxi->req_tgts.ort_srv_disp, task, obj_auxi) -> 准备读写大块数据
      sgls_size = daos_sgls_packed_size(sgls, nr, NULL) -> 计算sgl大小, 内联提取需要将 sqls 缓冲区打包到 RPC 中,因此使用它来检查是否需要批量传输(bulk)
      if (sgls_size >= DAOS_BULK_LIMIT || (obj_is_ec(obj) && !obj_auxi->reasb_req.orr_single_tgt)) -> 如果所有SGL总长度达到19K(1k留给header), 或者对象是EC模式且不是对单个tgt下发的EC_SGL重组请求, 则需要使用BULK传输
        obj_bulk_prep(sgls, nr, bulk_bind, bulk_perm, task, &obj_auxi->bulks)
            crt_bulk_create(daos_task2ctx(task), &sgls[i], bulk_perm, &bulks[i]) -> 针对每个SGL, 创建1个BULK, 得到bulk的指针数组和bulk个数
            crt_bulk_bind -> 将批量句柄绑定到本地上下文,将本地上下文的源地址与批量句柄关联起来。 它可用于将批量句柄从一台服务器转发/共享到另一台服务器,在这种情况下,批量句柄的原始地址可以即时序列化/反序列化。 示例用法:客户端向服务器 A 发送嵌入批量句柄的 RPC 请求,服务器 A 将客户端批量句柄转发到另一台服务器 B。对于该用法,客户端应调用此 API 将批量句柄与其本地上下文绑定 因此,当服务器B收到服务器A转发的反序列化的批量句柄时,服务器B就可以知道客户端的原始地址来进行批量传输。 用户应注意,绑定批量句柄会增加序列化的额外开销,因此建议谨慎使用。 在源上绑定批量句柄时,应使用 crt_bulk_bind_transfer(),因为源地址信息嵌入在句柄中
    obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) -> 扇出 shard_io_cb = io_cb = dc_obj_shard_rw

晓兵(ssbandjl)

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

DAOS汇总: https://cloud.tencent.com/developer/article/2344030

晓兵技术杂谈(系列)

https://cloud.tencent.com/developer/user/5060293/video

欢迎对DAOS, SPDK, RDMA等高性能技术感兴趣的朋友加入DAOS技术交流(群)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DAOS大块数据传输(RDMA单边读/写)流程要点
  • 源码流程分析
    • 客户端发送RPC/BULK的阈值(19KB)
    • 晓兵(ssbandjl)
    • 晓兵技术杂谈(系列)
    相关产品与服务
    对象存储
    对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档