前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >UCX-UCT统一通信传输层3-服务端和客户端调用栈详解(及相关)_源码解读

UCX-UCT统一通信传输层3-服务端和客户端调用栈详解(及相关)_源码解读

原创
作者头像
晓兵
修改2023-11-19 10:10:29
1.1K0
修改2023-11-19 10:10:29
举报
文章被收录于专栏:DPUdaos

主流程(接上篇)

代码语言:javascript
复制
主流程(服务端或客户端): 
1. 主函数中解析命令行参数(parse_cmd), 设置默认服务端口
2. 初始化上下文(ucs_async_context_create, 异步事件上下文用于管理定时器和FD通知), 在其中, 初始化多生产者/多消费者队列(ucs_mpmc_queue_init), 初始化非阻塞异步轮询器(ucs_async_poll_init), 初始化可重入自旋锁上下文等
3. 创建工人(uct_worker_create), 工人代表着 progress 的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用
4. 根据入参查找期望的传输层(dev_tl_lookup, 由最小延迟决定要使用的设备和传输)
5. 设置回调(uct_iface_set_am_handler), 设置服务端接收到客户端数据后的回调
6. 建立socket连接(connect_common), 服务端监听端口, 等待客户端发起socket连接
7. 客户端连接服务端后, 两边交换地址(sendrecv, 先通过socket发送和接收长度, 然后发送和接收地址, 交换地址)
8. 创建端点(uct_ep_create), 获取端点地址(uct_ep_get_address), 连接对等端点(uct_ep_connect_to_ep, 内部通过 ibv_modify_qp 设置QP状态机建立QP连接)
9. 连接建立后, 客户端调用短消息(do_am_short)/缓冲区(do_am_bcopy)/零拷贝(do_am_zcopy)发送数据
10. 显示驱动工人推进(uct_worker_progress, 该例程显式地处理任何未完成的通信操作和活动消息请求, 底层通过poll网卡完成事件,ibv_poll_cq)
11. 资源销毁(uct_ep_destroy,free其他资源等)

启动服务端源码解读

代码语言:javascript
复制
启动服务端: 
gdb: examples/uct_hello_world.c -> int main(int argc, char **argv)
  if (parse_cmd(argc, argv, &cmd_args)) -> int parse_cmd(int argc, char * const argv[], cmd_args_t *args)
    args->server_port   = 13337
  status = ucs_async_context_create(UCS_ASYNC_MODE_THREAD_SPINLOCK, &async) -> 创建异步执行上下文, 分配并初始化异步执行上下文。 这可用于确保安全的事件传递, 模式为线程自旋锁(可重入), 在异步对象上, 初始化多生产者和多消费者队列
    async = ucs_malloc(sizeof(*async), "async context") -> 内部分配, 并记录内存分配信息
      void *ptr = malloc(size)
    status = ucs_async_context_init(async, mode)
      ucs_trace_func("async=%p", async) -> 跟踪方法
      status = ucs_mpmc_queue_init(&async->missed) -> 初始化多生产者/多消费者队列
        ucs_queue_head_init(&mpmc->queue)
        pthread_spin_init(&lock->lock, lock_flags) -> 初始化自旋锁, 标志: PTHREAD_PROCESS_SHARED 或 PTHREAD_PROCESS_PRIVATE
      status = ucs_async_method_call(mode, context_init, async) -> .context_init       = ucs_async_poll_init -> 异步轮询初始化, 不阻塞
      async->last_wakeup = ucs_get_time() -> asm volatile("rdtsc" : "=a"(low), "=d"(high)) 汇编获取时间 -> UCS/ARCH/INFO:如果无法从CPU型号中读取x86 TSC值,请不要从/proc/cpuinfo中读取测量的CPU频率,因为它只能代表核心频率而不是TSC频率。 相反,通过一个短循环进行测量,当频率测量收敛或达到 1ms 时间限制时停止
        ucs_async_thread_spinlock_ops.context_init(async) -> ucs_async_thread_spinlock_init -> 可重入自旋锁上下文初始化
  status = uct_worker_create(async, UCS_THREAD_MODE_SINGLE, &if_info.worker) -> 创建工人(独立资源) -> UCS_CLASS_DEFINE_NAMED_NEW_FUNC(uct_worker_create -> 用宏初始化工人(类似面向对象实例化) -> 创建一个工作对象。 工人代表着progress的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用。 Transports 可以为每个 Worker 分配单独的通信资源,以便每个 Worker 都可以独立于其他 Worker 进行操作 -> 声明/定义一个创建类实例的函数, 初始化工人私有worker, 传输层链表
    ucs_class_t *cls = &uct_priv_worker_t_class -> typedef struct uct_priv_worker
    obj = ucs_class_malloc(cls)
    ucs_class_t *_cls = &uct_priv_worker_t_class
    uct_priv_worker_t_init -> static UCS_CLASS_INIT_FUNC(uct_priv_worker_t -> static UCS_CLASS_INIT_FUNC(uct_priv_worker_t -> 初始化父类以及当前类的传输链表
      static UCS_CLASS_INIT_FUNC(uct_worker_t)
        ucs_callbackq_init(&self->progress_q);
        ucs_vfs_obj_add_dir(NULL, self, "uct/worker/%p", self)
  status = dev_tl_lookup(&cmd_args, &if_info) -> 查找期望的传输层, 动态加载, 由最小延迟决定要使用的设备和传输, 实参为地址指针, 形参为指针
    status = uct_query_components(&components, &num_components) -> 查询组件列表。 获取当前系统上可用的传输组件列表, 得到8个组件, 为每个组件添加vfs对象:uct/component/组件名
      UCS_MODULE_FRAMEWORK_DECLARE(uct) -> 声明一个“框架”,它是可加载模块的特定集合的上下文。 通常特定框架中的模块提供相同内部接口的替代实现
        static ucs_init_once_t ucs_framework_init_once_uct = { { { 0, 0, 0, 0, 0, 0, 0, { 0, 0 } } }, 0 } -> 互斥锁+初始化标记
      UCS_MODULE_FRAMEWORK_LOAD(uct, 0) -> void ucs_load_modules 加载所有模块, self, tcp, sysv, posix, ib, rdmacm, cma, knem, ...
      [in] – _flags 模块加载标志,参见 ucs_module_load_flags_t 框架中的模块由 dlopen() 加载。 模块的共享库名称为:“lib<framework>_<module>.so.<version>”,其中: - <framework> 是框架名称 - <module> 是模块名称。 框架中所有模块的列表由自动生成的 config.h 文件中的预处理器宏 <framework>_MODULES 定义,例如:#define foo_MODULES ":bar1:bar2"。 - <version> 是模块的共享库版本,由 libtool 生成。 它是从当前库 (libucs) 的完整路径中提取的。 在以下位置搜索模块共享库(按优先级顺序): 1. 当前共享库 (libucs) 目录内的“ucx”子目录 2. ${libdir}/ucx,其中 ${libdir} 是 库的安装目录 请注意,如果 libucs 是从其安装路径加载的,则 (1) 和 (2) 是同一位置。 仅当 libucs 被移动或从构建目录运行时,路径才会不同,在这种情况下,优先使用“本地库”而不是“已安装的”库。[in] – _name 框架名称(作为令牌)
        ucs_load_modules("uct", uct_MODULES, &ucs_framework_init_once_uct, 0) -> 加载框架, 模块
          ucs_module_loader_init_paths -> 找到路径, 然后动态加载: ucs_module_load_one(framework, module_name, flags) -> ucs_module_init(module_path, dl) -> 找到初始化方法名, 如: module_init_name, 动态打开模块: dl = dlopen(module_path, mode)
            fullpath = realpath(module_path, buffer)
            init_func = (init_func_t)ucs_module_dlsym_shallow module_init_name -> 找到全局初始化函数入口, 一般都没有
          modules_str = ucs_strdup(modules, "modules_list") -> 从内存跟踪表中查找字符串
          module_name = strtok_r(modules_str, ":", &saveptr) -> 每次取冒号分割的第一个字符串分段(字符串分割函数 strtok_r )
          ucs_module_loader_add_dl_dir
            动态库路径: 0x6070b0 "/home/xb/project/ucx/src/ucs/.libs/libucs.so.0"
            dladdr((void*)&ucs_module_loader_state, &dl_info) -> 利用dladdr来获得so自身的路径(ucs_module_loader_state)
            ucs_module_loader_state.srch_path[ucs_module_loader_state.srchpath_cnt++] = path -> 记录动态库位置
          ucs_module_loader_add_install_dir
          ucs_module_global_init -> 查找动态库入库函数地址: addr = dlsym(dl, symbol) -> status = init_func()
            void UCS_F_CTOR uct_ib_init()
              uct_component_register(&uct_ib_component) -> 注册组件
              uct_tl_register(&uct_ib_component, uct_ib_tls[i]) -> 注册所有IB传输层
      ucs_list_for_each uct_components_list 8个组件 -> ucs_vfs_obj_add_dir -> ucs_vfs_node_add 虚拟文件系统
    component_attr.md_resources = alloca -> alloca - 分配自动释放的内存
    status = uct_component_query(components[cmpt_index], &component_attr) -> 查询网卡, 拷贝内存域数据资源
      status = component->query_md_resources(component, &resources, &num_resources); -> uct_md_query_single_md_resource -> 调用每个组件的查询内存域资源接口
      UCS_MODULE_FRAMEWORK_LOAD(uct_ib, 0) -> 调用查询内存域资源接口中会加载对应的动态库
    status = uct_md_config_read(components[cmpt_index], NULL, NULL, &md_config); -> 读取内存域配置
      status = uct_config_read(&bundle, &component->md_config, env_prefix)
        status = ucs_config_parser_fill_opts(config_bundle->data, entry, full_prefix, 0)
          ucs_config_parser_set_default_values(opts, entry->table) -> ucs_config_sscanf_table
          ucs_config_parser_get_sub_prefix(env_prefix, &sub_prefix)
          ucs_config_parse_config_files()
          ucs_config_apply_config_vars -> 应用环境变量, 以及自定义前缀的环境变量
    for 迭代内存域资源
    uct_md_open -> 重要函数, 打开内存域
      status = component->md_open(component, md_name, config, &md) -> ucs_status_t uct_ib_md_open -> IB实现的内存域打开函数
        ib_device_list = ibv_get_device_list(&num_devices) -> 获取所有网卡列表, 获取设备列表, 比如4个网口(网卡设备), 可通过 ibdev2netdev 查询rdma网口映射
        ibv_fork_init -> 核心原理: 通过对所有已注册的MR所在内存页打MADV_DONTFORK标记,创建子进程后,MR所在内存页不会触发COW拷贝,避免了前面所说的COW带来网卡DMA内存地址不一致的问题, 但会引入额外的内存记录和查找开销(降低性能)
        status = uct_ib_ops[i]->ops->open(ib_device, md_config, &md) -> static ucs_status_t uct_ib_mlx5_devx_md_open
          vhca_id -> 虚拟主机通道适配器ID
          ctx = uct_ib_mlx5_devx_open_device(ibv_device) -> 通过创建完成队列和事件通道来检查网卡设备是否支持事件通道
            ctx = mlx5dv_open_device(ibv_device, &dv_attr) -> verbs_open_device -> rdma-core
            ibv_create_cq(ctx, 1, NULL, NULL, 0)
            ibv_destroy_cq(cq)
            event_channel = mlx5dv_devx_create_event_channel
            mlx5dv_devx_destroy_event_channel(event_channel)
          md = ucs_derived_of(uct_ib_md_alloc(sizeof(*md), "ib_mlx5_devx_md", ctx)
          status = uct_ib_mlx5_check_uar(md) -> 用户访问区域
            uct_ib_mlx5_devx_uar_init
              uct_ib_mlx5_devx_alloc_uar
                mlx5dv_devx_alloc_uar
            uct_ib_mlx5_devx_uar_cleanup
          md->mkey_tag = 0; -> 使用间接密钥使 MR 无效
          uct_ib_mlx5_devx_mr_lru_init(md)
          status = uct_ib_device_query(dev, ibv_device)
            uct_ib_query_device
              ibv_get_device_name
              ret = ibv_query_device_ex(ctx, NULL, attr)
                ret = vctx->query_device_ex(context, input, attr, sizeof(*attr))
            ibv_query_port
            ucs_topo_resolve_sysfs_path ->  "/sys/devices/pci0000:15/0000:15:04.0/0000:17:00.0" -> PCI地址
            ucs_topo_get_sysfs_dev
            uct_ib_device_set_pci_id
            ucs_topo_get_pci_bw -> 获取PCI带宽
              effective_bw = (p->bw_gbps * 1e9 / 8.0) * width * ((double)p->encoding / p->decoding) * link_utilization; -> 计算带宽
          ret = mlx5dv_devx_general_cmd(ctx, in, sizeof(in), out, sizeof(out)) -> ucs_status_t uct_ib_mlx5_devx_general_cmd -> rdma-core -> 通过 devx 接口发出通用命令, 介绍 DEVX 对象及其 DV API:创建/修改/读取/销毁。 还添加了 DEVX 通用命令 API,以便能够直接从固件读取 CAP, 参考: https://patchwork.kernel.org/project/linux-rdma/patch/1539190590-31186-2-git-send-email-yishaih@mellanox.com/
          status = uct_ib_mlx5_devx_query_lag(md, &lag_state) UCT_IB_MLX5_CMD_OP_QUERY_LAG -> 链路汇聚(bonding), 参考: https://docs.nvidia.com/networking/display/bluefielddpuosv385/link+aggregation
          md->port_select_mode = uct_ib_mlx5_devx_query_port_select(md)
            uct_ib_mlx5_devx_general_cmd UCT_IB_MLX5_CMD_OP_QUERY_LAG
          uct_ib_mlx5_is_xgvmi_alias_supported(ctx) -> 跨越GVMI (XGVMI) - DPU 可以代表主机常驻内存启动 RDMA 操作,仅当数据源自或目标为 DPU 内存时才涉及 DPU 内存, Guest VM ID 主机_虚机ID
          uct_ib_mlx5_devx_check_odp(md, md_config, cap) -> 按需分页 (ODP) 是一种可以缓解内存注册缺点的技术。 应用程序不再需要确定地址空间的底层物理页,并跟踪映射的有效性。 相反,当页面不存在时,HCA 向操作系统请求最新的转换,并且操作系统使由于不存在页面或映射更改而不再有效的转换无效。 ODP 不支持连续页。ODP 可以进一步分为 2 个子类:显式 ODP 和隐式 ODP。显式 ODP 在显式 ODP 中,应用程序仍然注册内存缓冲区以进行通信,但此操作用于定义 IO 的访问控制而不是 pin-down 页面。 ODP 内存区域 (MR) 在注册时不需要具有有效的映射。 隐式 ODP 在隐式 ODP 中,为应用程序提供了一个特殊的内存密钥,该密钥代表其完整的地址空间。 所有引用该键的 IO 访问(受限于与该键关联的访问权限)不需要注册任何虚拟地址范围。 有关 ODP 的更多信息,请参阅了解按需寻呼 (ODP) 社区帖子
          uct_ib_mlx5_devx_general_cmd
          dev->atomic_align = ucs_rounddown_pow2(arg_size) -> 向下取整对齐 
          uct_ib_md_open_common(&md->super, ibv_device, md_config) -> ucs_status_t uct_ib_md_open_common
            uct_ib_device_init -> 初始化IB设备
              uct_ib_device_get_locality -> 获取cpu位置top
              ucs_sys_fcntl_modfl O_NONBLOCK -> 设置fd为非阻塞模式
                oldfl = fcntl(fd, F_GETFL)
              ucs_async_set_event_handler uct_ib_async_event_handler -> 异步事件处理
                ucs_async_method_call(mode, add_event_fd, async, event_fd, events)       
                  ucs_async_thread_add_event_fd
                    ucs_async_thread_start(&thread)
                    ucs_event_set_add
                      epoll_ctl(event_set->event_fd, EPOLL_CTL_ADD, fd, &raw_event)
                    ucs_async_pipe_push(&thread->wakeup)
                      ret = write(p->write_fd, &dummy, sizeof(dummy)) -> 写0通知对端
            uct_ib_md_parse_subnet_prefix -> 添加UCT_IB_SUBNET_PREFIX=fe80::以按subnet_prefix过滤IB端口
            uct_ib_check_gpudirect_driver /sys/kernel/mm/memory_peers/nv_mem/version
            uct_ib_check_gpudirect_driver /dev/kfd
            uct_ib_md_check_dmabuf(md)                
              ibv_reg_dmabuf_mr(md->pd, 0, ucs_get_page_size(), 0, bad_fd, UCT_IB_MEM_ACCESS_FLAGS)
            uct_ib_md_set_pci_bw(md, md_config) -> 修复: PCI 速度和系统设备 ID 设置不正确, 先从系统获取, 没找到则从底层设备获取
          uct_ib_mlx5_md_port_counter_set_id_init(md)
            ucs_carray_for_each(counter_set_id, md->port_counter_set_ids, sizeof(md->port_counter_set_ids)) -> 遍历c语言数组
          ucs_mpool_params_reset(&mp_params) -> 重置内存池参数
          mp_params.ops             = &uct_ib_mlx5_dbrec_ops -> 设置池操作表
          ucs_mpool_init(&mp_params, &md->dbrec_pool)
            VALGRIND_CREATE_MEMPOOL(mp, 0, 0) -> valgrind内存池
          uct_ib_mlx5_md_buf_alloc(md, ucs_get_page_size(), 0, &md->zero_buf, &md->zero_mem, 0, "zero umem");
            先对齐内存
            madvise(buf, size, MADV_DONTFORK) -> MADV_DONTFORK 在执行fork(2)后,子进程不允许使用此范围的页面。这样是为了避免COW机制导致父进程在写入页面时更改页面的物理位置
            mlx5dv_devx_umem_reg(md->super.dev.ibv_context, buf, size, access_mode) -> 注册或取消注册由 devx 接口使用的用户内存。 寄存器动词公开 UMEM DEVX 对象,用于 DMA 的用户内存注册。 用于注册用户存储器的 API 将用户地址、长度和访问标志作为输入,并向用户提供一个对象作为输出,该对象保存由固件返回到该已注册存储器的 UMEM ID。 用户将在使用此内存而不是物理地址列表的设备直接命令中使用该 UMEM ID,例如在 mlx5dv_devx_obj_create 上创建 QP
          uct_ib_md_parse_relaxed_order(&md->super, md_config, ksm_atomic)
          uct_ib_mlx5_devx_init_flush_mr(md)
            uct_ib_reg_mr(&md->super, md->zero_buf, UCT_IB_MD_FLUSH_REMOTE_LENGTH, &params, UCT_IB_MEM_ACCESS_FLAGS, &md->flush_mr); -> 当 access_flags 包含 IBV_ACCESS_ON_DEMAND 时,ibv_reg_mr() 可能会失败并出现 EAGAIN。 这意味着预取由于与失效冲突而失败
              UCS_PROFILE_CALL_ALWAYS(ibv_reg_mr, md->pd, address, length, access_flags) -> 调用verbs接口注册内存
              ibv_reg_dmabuf_mr -> 注册内存的另一种方式
              uct_ib_md_print_mem_reg_err_msg -> 如果内存为空, 打印错误
                UCS_STRING_BUFFER_ONSTACK(msg, 256) -> 声明一个字符串缓冲区,该缓冲区使用现有字符串作为后备存储。 此类字符串缓冲区不会分配额外的内存,也不必进行清理,并且它还可以用于在作为函数参数传递的现有 C 字符串缓冲区上构建字符串。
                uct_ib_memlock_limit_msg(&msg, err)
                  ucs_sys_get_effective_memlock_rlimit
                    getrlimit(RLIMIT_MEMLOCK, &limit_info) -> 获取内存限制
                ucs_string_buffer_cstr(&msg)
                  c_str = ucs_array_begin(&strb->str) -> 返回字符串数组第一个元素
                    ((&strb->str)->buffer)
            uct_ib_mlx5_devx_reg_ksm_data_addr
              uct_ib_mlx5_alloc_mkey_inbox(list_size, &in)
              uct_ib_mlx5_devx_reg_ksm(md, atomic, iova, length, list_size -> UCT/IB/MLX5:内存注册流程部分重构,使用params struct进行多线程注册(从上层传递),使用params struct注册flush_mr,改进调试日志记录
                UCT_IB_MLX5DV_SET(create_mkey_in, in, opcode, UCT_IB_MLX5_CMD_OP_CREATE_MKEY) -> 往结构体中插入一个值
                mkc = UCT_IB_MLX5DV_ADDR_OF(create_mkey_in, in, memory_key_mkey_entry) -> 限制所有条目的缓冲区大小以提高性能。 将密钥长度的虚拟地址 (KLM) 与固定内存大小关联时使用 KSM
                mlx5dv_devx_obj_create
        static ucs_status_t uct_self_md_open -> self模块实现的打开内存域
      uct_md_vfs_init(component, md, md_name) -> 用vfs节点表示组件和内存域
    uct_config_release(md_config)
    uct_md_query(iface_p->md, &iface_p->md_attr)
      status = uct_md_attr_v2_init(md, &md_attr_v2) -> static ucs_status_t uct_self_md_query -> ucs_status_t    uct_ib_md_query
        md_attr->access_mem_types          = UCS_BIT(UCS_MEMORY_TYPE_HOST) -> 内存访问的类型
        ucs_sys_cpuset_copy(&md_attr->local_cpus, &md->dev.local_cpus) -> for -> CPU_ISSET(c, src) -> 拷贝CPU参数
      uct_md_attr_from_v2(md_attr, &md_attr_v2) -> 用源(参数2)赋值给目的(参数1), 并用内存拷贝(memcpy)cpu和组件名
    uct_md_query_tl_resources(iface_p->md, &tl_resources, &num_tl_resources) -> 查询传输层资源。 该例程查询 uct_md_h“内存域”以获取可用的通信资源
        uct_self_query_tl_devices | uct_dc_mlx5_query_tl_devices
            uct_ib_device_query_ports
            uct_ib_device_port_check
                uct_ib_device_port_attr
        ucs_realloc
    for -> 遍历可用的传输层并找出合适的传输层名        
    status = init_iface(tl_resources[tl_index].dev_name, tl_resources[tl_index].tl_name, cmd_args->func_am_type, iface_p) -> 按传输层名初始化接口, 入参为: 网卡设备名,传输层名,活动消息类型, 接口
      status = uct_md_iface_config_read(iface_p->md, tl_name, NULL, NULL, &config) -> 读取并填充配置
      status = uct_iface_open(iface_p->md, iface_p->worker, &params, config, &iface_p->iface) -> 打开通信接口
        uct_find_tl(md->component, params->mode.device.tl_name) -> 查找传输层
        status = tl->iface_open -> static UCS_CLASS_DEFINE_NEW_FUNC(uct_rc_verbs_iface_t -> static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t <- .iface_open         = UCS_CLASS_NEW_FUNC_NAME(_iface_class) <- UCT_TL_DEFINE_ENTRY -> 宏展开得到(uct_rc_verbs_iface_t_new)
            init_attr.qp_type               = IBV_QPT_RC -> 设置QP属性
            UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t -> 执行父类的构造函数(初始化) -> uct_rc_verbs_iface_t_init -> UCS_CLASS_INIT_FUNC(uct_rc_iface_t
                UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t -> UCS_CLASS_INIT_FUNC(uct_ib_iface_t
                    preferred_cpu = ucs_cpu_set_find_lcs(&cpu_mask)
                    UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t
                        uct_base_iface_t_init (&self->super, _myclass->superclass, -> UCS_CLASS_INIT_FUNC(uct_base_iface_t
                            UCS_CLASS_CALL_SUPER_INIT(uct_iface_t, ops)
                                uct_iface_t_init (&self->super, _myclass->superclass -> UCS_CLASS_INIT_FUNC(uct_iface_t, uct_iface_ops_t *ops)
                                    self->ops = *ops;
                            UCT_CB_FLAGS_CHECK((params->field_mask
                            self->internal_ops      = internal_ops -> UCS_CLASS_INIT_FUNC(uct_base_iface_t, uct_iface_ops_t *ops,
                            uct_worker_progress_init(&self->prog)
                            uct_iface_set_stub_am_handler(self, id);
                                iface->am[id].cb    = uct_iface_stub_am_handler
                            UCS_STATS_NODE_ALLOC(&self->stats -> ucs_status_t ucs_stats_node_alloc
                                ucs_stats_node_new(cls, &node)
                                ucs_stats_node_initv(node, cls, name, ap)
                                    ucs_stats_name_check(cls->name)
                                    ucs_vsnprintf_safe(node->name, UCS_STAT_NAME_MAX, name, ap)
                                        vsnprintf(buf, size, fmt, ap)
                                        buf[size - 1] = '\0' -> 字符串数组补0
                                    ucs_list_head_init(&node->children[UCS_STATS_INACTIVE_CHILDREN])
                                ucs_stats_filter_node_new(node->cls, &filter_node)
                                ucs_stats_node_add(node, parent, filter_node)
                                    ucs_list_add_tail(&parent->children[UCS_STATS_ACTIVE_CHILDREN], &node->list)
                                    ucs_stats_add_to_filter(node, filter_node)
                    uct_ib_device_find_port(dev, params->mode.device.dev_name, &port_num)
                    uct_ib_iface_set_path_mtu(self, config)
                        (IBV_DEV_ATTR(dev, vendor_id) == 0x02c9) -> #  define IBV_DEV_ATTR(_dev, _attr)        ((_dev)->dev_attr._attr) -> 常用操作定义为宏
                    uct_ib_iface_init_pkey(self, config)
                        ibv_query_pkey(dev->ibv_context, iface->config.port_num, pkey_index, &port_pkey)
                        pkey = ntohs(port_pkey)
                    uct_ib_iface_init_gid_info(self, config)
                        uct_ib_iface_init_roce_gid_info(iface, cfg_gid_index)
                            uct_ib_device_select_gid(dev, port_num, &iface->gid_info) -> 选择要使用的最佳 gid 并在 RoCE 端口上设置其信息 - gid 索引、RoCE 版本和地址家族
                                uct_ib_device_query_gid_info
                                    ibv_query_gid(ctx, port_num, gid_index, &info->gid)
                                    uct_ib_device_get_addr_family(&info->gid, gid_index)
                                        const uint32_t addr_last_bits = raw->s6_addr32[2] ^ htonl(0x0000ffff)
                                        uct_ib_device_is_addr_ipv4_mcast(raw, addr_last_bits)
                                            return (raw->s6_addr32[0] == htonl(0xff0e0000)) && !(raw->s6_addr32[1] | addr_last_bits) -> 编码IPv4多播地址
                                uct_ib_device_test_roce_gid_index -> UCT/IB:使用 ibv_create_ah() 测试本地 gid 的有效性 在某些情况下,可能无法为 RoCE 设备正确配置网络地址,因此在选择要使用的默认 GID 索引时,我们希望跳过 GID 表中的条目 无法创建AH
                                    ibv_create_ah(ucs_container_of(dev, uct_ib_md_t, dev)->pd, &ah_attr)
                                    ibv_destroy_ah(ah)
                            或 uct_ib_device_query_gid_info
                        uct_ib_iface_init_roce_addr_prefix(iface, config)
                            ucs_sockaddr_inet_addr_size(gid_info->roce_info.addr_family, &addr_size)
                            uct_ib_device_get_roce_ndev_name
                            ucs_netif_get_addr -> 获取给定接口的地址和网络掩码
                            ucs_sockaddr_get_inet_addr
                            ucs_count_ptr_trailing_zero_bits -> 计算缓冲区末尾有多少位等于零
                            ucs_debug failed to detect RoCE subnet mask prefix on -> UCT:使用子网掩码确定 RoCE IP 的可达性
                        uct_ib_device_query_gid
                    uct_ib_iface_init_lmc(self, config)
                        uct_ib_iface_port_attr(iface)->lmc -> LID mask control 当多个LID被分配给当前端口时使用
                    uct_ib_iface_set_num_paths(self, config) -> UCT/IB:为 RoCE LAG(链路聚合,bond相关) 和 IB LMC 实现多路径
                        iface->num_paths = uct_ib_iface_roce_lag_level(iface) -> 计算bond后的实际带宽, 端口数
                            ucs_netif_bond_ad_num_ports(ndev_name) -> 获取绑定设备的活动 802.3ad 端口数。 如果设备不是绑定设备,或者未启用802.3ad,则返回1
                                ucs_read_file_number(&ad_num_ports, 1, "%s/%s", bond_path, UCS_BOND_NUM_PORTS_FILE); -> 
                        (uct_ib_iface_port_attr(iface)->active_speed == UCT_IB_SPEED_NDR))
                    self->comp_channel = ibv_create_comp_channel(dev->ibv_context) -> 支持中断通知
                    ucs_sys_fcntl_modfl(self->comp_channel->fd, O_NONBLOCK, 0) -> 设置事件通道fd为非阻塞模式
                    self->ops->create_cq(self, UCT_IB_DIR_TX -> ucs_status_t uct_ib_verbs_create_cq
                        ibv_cq_ex_to_cq(ibv_create_cq_ex(dev->ibv_context, &cq_attr))
                    self->ops->create_cq(self, UCT_IB_DIR_RX
                    self->addr_size  = uct_ib_iface_address_size(self) -> UCT/GTEST:CM 上的客户端-服务器 - 框架和 CM 基本功能。 重构 IB 地址函数,为在 qp-less 模式下通过 rdmacm 建立连接做准备,添加基本 gtest 来测试基本 CM(连接管理器)功能
                        return uct_ib_address_size(&params) -> 根据地址打包标记判断
                uct_iface_set_async_event_params(params, &self->async.event_cb,&self->async.event_arg)
                self->super.release_desc.cb = uct_ud_iface_release_desc -> 启用异步推进
                ucs_ptr_array_init(&self->eps, "ud_eps") -> 初始化数组, 先清空结构体字段, 然后设置数组名
                uct_ud_iface_create_qp(self, config) -> 创建队列对,设置类型(UD不可靠数据报), 
                    qp_init_attr.qp_type             = IBV_QPT_UD
                    ops->create_qp(&self->super, &qp_init_attr, &self->qp) -> ucs_status_t uct_ib_iface_create_qp
                        ibv_create_qp
                    qp_attr.qp_state   = IBV_QPS_INIT
                    ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY)
                    qp_attr.qp_state = IBV_QPS_RTR
                    ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE)
                    qp_attr.qp_state = IBV_QPS_RTS
                    ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE | IBV_QP_SQ_PSN)
                uct_ib_iface_recv_mpool_init(&self->super, &config->super, params, "ud_recv_skb", &self->rx.mp) -> 创建接收描述符的内存池
                    grow = 1024
                    uct_iface_param_am_alignment(params, iface->config.seg_size, -> 根据接口参数中提供的用户配置初始化 AM 数据对齐及其偏移量
                        *align        = UCS_SYS_CACHE_LINE_SIZE -> cacheline 缓存行对齐
                    uct_iface_mpool_init uct_ib_iface_recv_desc_init
                        ucs_mpool_params_reset(&mp_params)
                        uct_iface_mpool_config_copy(&mp_params, config)
                        ucs_mpool_init(&mp_params, mp) -> ucs_status_t ucs_mpool_init
                            mp->data = ucs_malloc(sizeof(*mp->data) + params->priv_size, "mpool_data")
                            mp->data->grow_factor     = params->grow_factor -> 增长因子
                            VALGRIND_CREATE_MEMPOOL(mp, 0, 0); -> Memcheck:内存错误检测器, https://valgrind.org/docs/manual/mc-manual.html, 该请求将地址池注册为内存池的锚地址。 它还提供了 rzB 大小,指定放置在从池中分配的块周围的 redzone 应该有多大。 最后,它提供了一个 is_zeroed 参数,用于指定分配时池的块是否归零
                ucs_mpool_grow(&self->rx.mp, self->rx.available) -> UCT/IB:减少接收缓冲区的内存消耗。 仅当接口上启用进度时才分配接收缓冲区。 对于 UD 和 DC,分配少量接收缓冲区,以便能够处理新的传入连接, 给内存池增加指定数量的元素
                    data->ops->chunk_alloc(mp, &chunk_size, &ptr) -> UCS/UCT/TEST:重构和优化内存池基础设施 -> UCS_PROFILE_FUNC_ALWAYS(ucs_status_t, uct_iface_mp_chunk_alloc
                        uct_iface_mem_alloc(&iface->super, length, UCT_MD_MEM_ACCESS_LOCAL_READ  | UCT_MD_MEM_ACCESS_LOCAL_WRITE | UCT_MD_MEM_FLAG_LOCK, ucs_mpool_name(mp), &mem) -> 分配可用于零拷贝通信的内存。 分配可用于特定传输接口上的零复制数据传输或远程访问的内存区域
                            uct_md_query(iface->md, &md_attr)
                            params.mem_type        = UCS_MEMORY_TYPE_HOST
                            uct_mem_alloc(length, alloc_methods, num_alloc_methods, &params, mem) -> 为零拷贝通信和远程访问分配内存。 分配可能已注册的内存, 统一内存分配层?
                                uct_mem_alloc_check_params(length, methods, num_methods, params)
                                uct_md_mem_alloc(md, &alloc_length, &address,
                            uct_md_mem_reg(iface->md, mem->address, mem->length, flags,    
                    ucs_mpool_chunk_elems(mp, chunk)
                    ucs_mpool_num_elems_per_chunk(mp, chunk, chunk_size)
                    for (i = 0; i < chunk->num_elems; ++i)
                        ucs_mpool_add_to_freelist(mp, elem)
                            VALGRIND_MAKE_MEM_DEFINED(tail, sizeof *tail) -> UCS/MPOOL:在调试模式下添加 FIFO 行为选项,而不是 LIFO。FIFO 模式对于调试很有用,因为 mpool 对象不像 LIFO 那样被回收。 LIFO 的性能更好,因为它减少了缓存未命中和 mpool get/put 开销。 设置 UCS_MPOOL_FIFO=y 启用此模式
                    VALGRIND_MAKE_MEM_NOACCESS(chunk + 1, chunk_size - sizeof(*chunk))
                uct_iface_mpool_init(&self->super.super, &self->tx.mp,  uct_ud_iface_send_skb_init
            uct_rc_am_hdr_fill(&self->am_inl_hdr.rc_hdr, 0)
            status = uct_iface_mpool_init -> 创建地址控制器和原子内存池
            uct_rc_verbs_iface_init_inl_wrs -> 初始化可靠连接接口上的工作请求和内联请求,设置RDMA操作码
                iface->inl_am_wr.opcode         = IBV_WR_SEND;
                iface->inl_am_wr.send_flags     = IBV_SEND_INLINE;
                iface->inl_rwrite_wr.opcode     = IBV_WR_RDMA_WRITE;
                iface->inl_rwrite_wr.send_flags = IBV_SEND_SIGNALED | IBV_SEND_INLINE;
            status = uct_rc_init_fc_thresh(&config->super, &self->super)
            status = uct_rc_iface_qp_create
                UCS_PROFILE_CALL_ALWAYS(ibv_create_qp_ex, 或
                UCS_PROFILE_CALL_ALWAYS(ibv_create_qp
            uct_ib_destroy_qp(qp)
        ucs_vfs_obj_add_dir(worker, *iface_p, "iface/%p", *iface_p)
        ucs_vfs_obj_add_sym_link(*iface_p, md, "memory_domain")
        ucs_vfs_obj_set_dirty(*iface_p, uct_iface_vfs_refresh)
      uct_iface_progress_enable(iface_p->iface -> uct_base_iface_progress_enable_cb -> uct_rc_verbs_iface_common_progress_enable
        uct_rc_verbs_iface_common_prepost_recvs(iface)
          uct_rc_verbs_iface_post_recv_common
            uct_rc_verbs_iface_post_recv_always
              uct_ib_iface_prepare_rx_wrs
                UCT_TL_IFACE_GET_RX_DESC(&iface->super, mp, desc, break) -> ucs_mpool_get_inline
                  ucs_mpool_get_grow
                    ucs_mpool_grow -> chunk_alloc -> uct_iface_mp_chunk_alloc
                      uct_iface_mem_alloc
                    ...
                    ibv_reg_mr_iova2
              ibv_post_srq_recv  
      uct_rc_verbs_iface_progress(search) -> 驱动接口运转
        ucs_callbackq_add_safe
      status = uct_iface_query(iface_p->iface, &iface_p->iface_attr) -> uct_rc_verbs_iface_query
        status = uct_rc_iface_query
          status = uct_ib_iface_query -> ucs_status_t uct_ib_iface_query
            static const uint8_t ib_port_widths[] = {[1] = 1, [2] = 4, [4] = 8, [8] = 12, [16] = 2};
            uct_base_iface_query(&iface->super, iface_attr)
            active_width = uct_ib_iface_port_attr(iface)->active_width
            case UCT_IB_SPEED_EDR -> 网卡速度
              iface_attr->latency.c = 600e-9
              signal_rate           = 25.78125e9
              encoding              = 64.0/66.0
            num_path   = uct_ib_iface_is_roce(iface) 
          uct_ib_device_has_pci_atomics(dev))
        iface_attr->latency.m += 1e-9 -> 1ns 纳秒
        iface_attr->overhead   = 75e-9
        iface_attr->ep_addr_len = uct_ib_md_is_flush_rkey_valid
  status = uct_iface_set_am_handler(if_info.iface, id, hello_world, &cmd_args.func_am_type, 0) -> 设置服务端接收到客户端数据后的回调
    iface->am[id].cb    = cb
  connect_common
    ret = getaddrinfo(server, service, &hints, &res)
    sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol)
    setsockopt
    bind, listen, accept -> 等待客户端发起socket连接
  status = uct_iface_get_device_address(if_info.iface, own_dev)
  sendrecv(oob_sock, own_dev, if_info.iface_attr.device_addr_len, (void**)&peer_dev) -> 先通过socket发送和接收长度, 然后发送和接收地址, 交换地址
  uct_iface_is_reachable 检查地址是否可达 -> uct_ib_iface_is_reachable_v2 -> static int uct_ib_iface_dev_addr_is_reachable -> ucs_test_all_flags
    device_addr = (const uct_ib_address_t*) UCS_PARAM_VALUE -> 如果设置了字段掩码中的标志,则有条件地返回参数值。 否则,返回默认值
  status = uct_ep_create(&ep_params, &ep) -> 创建新端点 -> UCS_CLASS_DEFINE_NEW_FUNC(uct_rc_verbs_ep_t, uct_ep_t, const uct_ep_params_t *)
  ucs_vfs_obj_set_dirty(params->iface, uct_iface_vfs_refresh)
    ucs_vfs_global_init() -> UCS_INIT_ONCE(&ucs_vfs_init_once) 单例
    ucs_vfs_node_find_by_obj -> ucs_vfs_kh_find -> klib -> kh_get -> 独立且轻量级c库: https://github.com/attractivechaos/klib
  ucs_status_t uct_rc_verbs_ep_get_address
  status = uct_ep_connect_to_ep(ep, peer_dev, peer_ep) -> uct_rc_verbs_ep_connect_to_ep_v2
    status = uct_rc_iface_qp_connect(iface, ep->qp, qp_num, &ah_attr, path_mtu) -> ucs_status_t uct_rc_iface_qp_connect
      qp_attr.qp_state              = IBV_QPS_RTR -> 设备qp
      ret = ibv_modify_qp(qp, &qp_attr, qp_attr_mask); -> 设置qp状态机
      ...
      qp_attr.qp_state              = IBV_QPS_RTS
      ret = ibv_modify_qp(qp, &qp_attr, qp_attr_mask)
      ucs_debug("connected rc qp 0x%x on " -> 打印debug日志 -> connected rc qp 0x1a91b on mlx5_0:1/RoCE to lid 49152(+0) sl 0 remote_qp 0x1a91a mtu 1024 timer 18x7 rnr 13x7 rd_atom 16
  uct_worker_progress(if_info.worker) -> UCT 显示驱动工人推进。 该例程显式地处理任何未完成的通信操作和活动消息请求 -> uct_rc_verbs_iface_progress
    count = uct_rc_verbs_iface_poll_rx_common(iface) -> ibv_poll_cq
      uct_rc_verbs_iface_post_recv_common(iface, 0)
    return count + uct_rc_verbs_iface_poll_tx(iface) -> always_inline
      UCT_RC_VERBS_IFACE_FOREACH_TXWQE(&iface->super, i, wc, num_wcs) -> 遍历宏
      uct_rc_txqp_completion_desc
      ucs_arbiter_group_schedule -> 安排一个小组进行仲裁。 如果该组已经存在,则该操作将无效
      uct_rc_verbs_update_tx_res(&iface->super, ep, count)
        uct_rc_txqp_available_add
        uct_rc_iface_update_reads -> 将 RDMA_READ 积分释放回 RC iface。 RDMA_READ 积分在完成回调中释放,但不会释放到 RC iface 以避免 OOO 发送。 否则,如果读取信用是唯一缺少的资源并在完成回调中释放,则即使挂起队列不为空,下一个完成回调也将能够发送
        uct_rc_iface_add_cq_credits -> UCT/IB:修复错误处理后清除待处理请求的问题
      ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending, NULL) -> 在仲裁器中调度工作元素。 对于每个组,只要回调返回 REMOVE_ELEM 或 NEXT_GROUP,最多会调度 per_group 工作元素。 然后,对下一组执行相同的操作,直到仲裁器变空或回调返回 STOP。 如果一个组没有元素,或其回调返回 REMOVE_GROUP,则它将被删除,直到使用 ucs_arbiter_group_schedule() 将其放回到仲裁器上
  if (barrier(oob_sock, progress_worker, if_info.worker)) -> TCP/TEST:修复 ucp_hello_world 同时 ep close 的问题
  out_free_ep -> 释放资源
  ...
​
​
server:
while (desc_holder == NULL) 
uct_worker_progress(if_info.worker) -> static inline unsigned ucs_callbackq_dispatch -> count += cb(elem->arg)
  static unsigned uct_rc_verbs_iface_progress(void *arg) <- self->super.progress
    count = uct_rc_verbs_iface_poll_rx_common(iface)
      uct_rc_verbs_iface_handle_am
        uct_iface_invoke_am
          status = handler->cb(handler->arg, data, length, flags) -> hello_world -> 服务端收到数据后回调
            print_strings
​
​
​
​
uct_iface_progress_enable
  uct_iface_progress -> return iface->ops.iface_progress(iface)
    uct_rc_iface_do_progress <- .iface_progress
      return iface->progress(iface)
      
​
​
​
typedef struct ucs_mpmc_queue, 多生产者多消费者线程安全队列。 在“良好”场景中,每次推/拉都是一个原子操作
​
typedef struct uct_tl_resource_desc, 通信资源描述符。 资源描述符是表示网络资源的对象。 资源描述符可以表示独立的通信资源(例如HCA端口、网络接口)或多个资源(例如多个网络接口或通信端口)。 它还可以表示通过单个物理网络接口定义的虚拟通信资源
​
md_resources, 内存域资源数组。 使用时,应在使用指向数组的指针调用 @ref uct_component_query 之前对其进行初始化,该数组足够大以容纳所有内存域资源条目。 调用后,该数组将填充现有内存域资源的信息。 为了分配该数组,您可以调用@ref uct_component_query两次:第一次将仅通过在field_mask中指定@ref UCT_COMPONENT_ATTR_FIELD_MD_RESOURCE_COUNT来获取所需的条目数量。 然后,可以为数组分配返回的条目数,并传递给对 @ref uct_component_query 的第二次调用,这次将 field_mask 设置为 @ref UCT_COMPONENT_ATTR_FIELD_MD_RESOURCES
​
​
​
connect_common
uct_iface_get_device_address
sendrecv
status = uct_ep_create(&ep_params, &ep)
uct_rc_verbs_ep_connect_to_ep_v2

客户端流程

代码语言:javascript
复制
client: 客户端流程
[root@node63 ucx]# /home/xb/project/ucx/examples/.libs/lt-uct_hello_world -d mlx5_0:1 -t rc_verbs -n 172.17.29.63 -z
...
if (connect(sockfd, t->ai_addr, t->ai_addrlen) == 0)
​
examples/uct_hello_world.c -> int main(int argc, char **argv)
if (parse_cmd(argc, argv, &cmd_args)) -> 解析命令行参数
ucs_async_context_create
uct_worker_create
dev_tl_lookup
status = uct_iface_set_am_handler(if_info.iface, id, hello_world -> 设置接收到数据后的回调
connect_common
uct_iface_get_device_address
uct_iface_get_address
uct_ep_create
sendrecv
char *str = (char *)mem_type_malloc(cmd_args.test_strlen) -> 分配16字节的字符(待发送的数据), malloc | cudaMalloc | cudaMallocManaged
generate_test_string -> 生成测试字符串
  memcpy(dst, src, count) -> 拷贝16字节的字符串到str
do_am_zcopy(&if_info, ep, id, &cmd_args, str) -> str -> char *buf 字符串地址转为buf地址
  uct_md_mem_reg -> uct_ib_mlx5_devx_mem_reg -> 将buf地址进行内存注册得到内存控制器(memh->address = address)
    uct_ib_memh_alloc(&md->super, length,
        memh = ucs_calloc(1, memh_base_size + (mr_size * num_mrs), "ib_memh")
    uct_ib_mlx5_devx_reg_mr(md, memh, address,
        access_flags & IBV_ACCESS_ON_DEMAND -> 按需注册
        uct_ib_reg_mr(&md->super, address, length, params, access_flags,
        *lkey_p = memh->mrs[mr_type].super.ib->lkey
        *rkey_p = memh->mrs[mr_type].super.ib->rkey
        uct_ib_mem_prefetch(&md->super, &memh->super, address, length) -> 支持内存预取
            UCS_PROFILE_CALL(ibv_advise_mr -> 提供有关 MR 中地址范围的建议
    mr = UCS_PROFILE_CALL_ALWAYS(ibv_reg_mr, md->pd, address, length, access_flags) -> 注册该段内存
  iov.buffer = buf -> buf转iov
  iov.memh   = memh -> 空指针
  iov.stride = 0;
  iov.count  = 1;
  ...
if (barrier(oob_sock, progress_worker, if_info.worker))
​
...
ucs_status_t do_am_short
  UCT_INLINE_API ucs_status_t uct_ep_am_short
    ucs_status_t uct_rc_verbs_ep_am_short
      uct_rc_verbs_iface_fill_inl_am_sge
      uct_rc_verbs_ep_post_send
        ibv_post_send

零拷贝调用栈

代码语言:javascript
复制
do_am_bcopy
  uct_ep_am_bcopy
    uct_rc_verbs_ep_am_bcopy
      UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC
      ucs_mpool_get_inline
        status = uct_rc_verbs_ep_post_send_desc(ep, &wr, desc, send_flags | IBV_SEND_SOLICITED, UCT_IB_MAX_ZCOPY_LOG_SGE(&iface->super.super));
        UCT_RC_VERBS_FILL_DESC_WR(wr, desc) -> 将描述填充到工作请求, 新建sge与wr通过sgl关联
        uct_rc_verbs_ep_post_send(iface, ep, wr, send_flags, max_log_sge)
          ucs_assertv(ep->qp->state == IBV_QPS_RTS -> 队列对状态必须是准备发送
          uct_rc_iface_tx_moderation
          uct_rc_ep_fm
          uct_ib_log_post_send
          ret = ibv_post_send(ep->qp, wr, &bad_wr); -> 提交wr到qp的发送队列, 触发服务端收到消息后打印, print_strings
          uct_rc_verbs_txqp_posted -> 为DC(动态连接队列对 Dynamically Connected (DC) QPs)传输类型做准备, cq流控信用, 状态计数器等
        uct_rc_txqp_add_send_op_sn -> 提交发送后, 将io描述, 按序号sn插入outstanding队列, 因为在轮询完成时,我们可获得完成的数量(而不是基于完成的零索引)
          ucs_queue_push(&txqp->outstanding, &op->queue)
​
​
ucs_status_t do_am_zcopy -> 零拷贝
  uct_md_mem_reg -> uct_md_mem_reg_v2 -> md->ops->mem_reg -> UCS_PROFILE_CALL_ALWAYS(ibv_reg_mr -> 注册内存 -> 分析函数调用, 当 access_flags 包含 IBV_ACCESS_ON_DEMAND 时,ibv_reg_mr() 可能会失败并出现 EAGAIN。 这意味着预取由于与失效冲突而失败
    __ibv_reg_mr
  comp.uct_comp.func   = zcopy_completion_cb -> 完成回调
  do ... while (status == UCS_ERR_NO_RESOURCE) -> 没资源时退出循环 
  status = uct_ep_am_zcopy(ep, id, NULL, 0, &iov, 1, 0, (uct_completion_t *)&comp) -> ep->iface->ops.ep_am_zcopy
    ucs_status_t uct_rc_mlx5_ep_am_zcopy | ucs_status_t uct_rc_verbs_ep_am_zcopy -> 可靠连接verbs端点的活动消息零拷贝
      struct ibv_sge sge[UCT_IB_MAX_IOV];
      UCT_CHECK_IOV_SIZE -> 检查向量IO个数, 应该不超过网卡最大发送(SGE-1), 第一个sge给header使用
      UCT_RC_CHECK_AM_ZCOPY -> 检查零拷贝参数
      UCT_RC_CHECK_RES_AND_FC -> 检查资源和流控参数
      UCT_RC_IFACE_GET_TX_AM_ZCOPY_DESC -> 从内存池获取发送端零拷贝内存描述 -> UCT/RC:使用恒定的标头大小,无论 TM 状态如何
      sge_cnt = uct_ib_verbs_sge_fill_iov(sge + 1, iov, iovcnt) -> 填充IO(将iov设置到sge上), 通过 uct_iov_t 中提供的数据填充 ibv_sge 数据结构 该函数避免复制零长度的 IOV
        sge[sge_it].addr   = (uintptr_t)(iov[iov_it].buffer) -> buffer地址转sge地址
        sge[sge_it].lkey = uct_ib_memh_get_lkey(iov[iov_it].memh) -> 设置本地键
      UCT_RC_VERBS_FILL_AM_ZCOPY_WR_IOV -> 准备工作请求wr, 将sge首地址和数量设置到wr, rdma操作码为发送(双边, IBV_WR_SEND)
        wr.sg_list = sge; wr.num_sge = (sge_cnt + 1); wr.opcode = (typeof(wr.opcode))IBV_WR_SEND;
      UCT_TL_EP_STAT_OP -> 统计发送状态
      uct_rc_verbs_ep_post_send_desc | status = uct_rc_mlx5_ep_zcopy_post(ep, MLX5_OPCODE_SEND, iov, iovcnt, 0ul, id, header, header_length, 0, 0, 0ul, 0, 0, MLX5_WQE_CTRL_SOLICITED, uct_rc_ep_send_op_completion_handler, 0, comp);
        ----------- rc_mlx5_iface 迈络思可靠连接接口的提交实现
        uct_rc_mlx5_txqp_dptr_post_iov
          uct_rc_mlx5_am_hdr_fill
          uct_ib_mlx5_inline_copy
          uct_rc_mlx5_common_post_send
            uct_ib_mlx5_post_send -> UCT/IB/MLX5:修复 TX WQ 溢出检查,当 WQE 恰好在 hw_ci+qp_length 结束时,当前检查失败,即使这是有效的情况,将溢出检查重构为特定断言并添加注释来解释它们,移动检查 函数到 C 文件,因为它不是快速路径
              num_bb  = ucs_div_round_up(wqe_size, MLX5_SEND_WQE_BB) -> 向上取大的
              uct_ib_mlx5_txwq_validate
              ucs_memory_cpu_store_fence -> 内存屏障 -> asm volatile(""::: "memory") -> 防止编译器重新排序指令
              *wq->dbrec = htonl(sw_pi += num_bb) -> 写门铃记录, volatile uint32_t           *dbrec;
              ucs_memory_bus_store_fence()
              ucs_likely uct_ib_mlx5_bf_copy
                uct_ib_mlx5_bf_copy_bb -> UCP/UCT/IB:修复UCT中的线程模式,当以“序列化”模式创建 IB 接口时,请确保刷新写入组合缓冲区,以避免在另一个线程使用同一 MMIO 寄存器时数据损坏
                  UCS_WORD_COPY(uint64_t, dst, uint64_t, src, MLX5_SEND_WQE_BB) -> UCT/IB:为 DM 字节复制循环解决无效的 GCC 矢量化问题 GCC 可以生成“movdqa”指令,该指令假定源缓冲区与 16 字节对齐,但是源缓冲区是由用户提供的,并且可能未对齐。 将源缓冲区类型声明为严格未对齐,以防止 GCC 进行无效矢量化
            uct_rc_txqp_posted
        uct_rc_txqp_add_send_comp
      UCT_RC_UPDATE_FC -> 更新流控
  uct_worker_progress
​
​

零拷贝内存示意图

创建队列对调用栈(ibv_create_qp)

代码语言:javascript
复制
(gdb) b ibv_create_qp
#0  0x00007ffff65347b0 in ibv_create_qp () from /lib64/libibverbs.so.1
#1  0x00007ffff6778a58 in uct_rc_verbs_can_create_qp (ctx=<optimized out>, pd=0x60d3f0) at rc/verbs/rc_verbs_iface.c:556
#2  0x00007ffff6778c23 in uct_rc_verbs_query_tl_devices (md=0x60efd0, tl_devices_p=0x7fffffffd630, num_tl_devices_p=0x7fffffffd620) at rc/verbs/rc_verbs_iface.c:581
#3  0x00007ffff792864f in uct_md_query_tl_resources (md=0x60efd0, resources_p=resources_p@entry=0x7fffffffd7b0, num_resources_p=num_resources_p@entry=0x7fffffffd790) at base/uct_md.c:94
#4  0x000000000040248c in dev_tl_lookup (cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:363
#5  0x000000000040193b in main (argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611
(gdb) c
(gdb) bt
#0  0x00007ffff65347b0 in ibv_create_qp () from /lib64/libibverbs.so.1
#1  0x00007ffff675db21 in ibv_create_qp_ex (qp_init_attr_ex=0x7fffffffd558, context=<optimized out>) at /usr/include/infiniband/verbs.h:3016
#2  uct_ib_iface_create_qp (iface=iface@entry=0x6160f0, attr=attr@entry=0x7fffffffd520, qp_p=qp_p@entry=0x7fffffffd4e0) at base/ib_iface.c:1024
#3  0x00007ffff677238b in uct_rc_iface_qp_create (iface=iface@entry=0x6160f0, qp_p=qp_p@entry=0x7fffffffd4e0, attr=attr@entry=0x7fffffffd520, max_send_wr=<optimized out>, srq=<optimized out>)
    at rc/base/rc_iface.c:838
#4  0x00007ffff67794ed in uct_rc_verbs_iface_t_init (tl_config=0x60e130, params=<optimized out>, worker=<optimized out>, tl_md=<optimized out>, _init_count=0x7fffffffd4d0, 
    _myclass=0x7ffff69fc760 <uct_rc_verbs_iface_t_class>, self=0x6160f0) at rc/verbs/rc_verbs_iface.c:357
#5  uct_rc_verbs_iface_t_new (arg0=<optimized out>, arg1=<optimized out>, arg2=<optimized out>, arg3=0x60e130, obj_p=0x7fffffffdcd8) at rc/verbs/rc_verbs_iface.c:461
#6  0x00007ffff7928b5b in uct_iface_open (md=0x60efd0, worker=0x607560, params=params@entry=0x7fffffffd820, config=0x60e130, iface_p=iface_p@entry=0x7fffffffdcd8) at base/uct_md.c:250
#7  0x0000000000402738 in init_iface (iface_p=0x7fffffffdac0, func_am_type=FUNC_AM_SHORT, tl_name=0x616010 "rc_verbs", dev_name=0x61601a "mlx5_0:1") at uct_hello_world.c:271
#8  dev_tl_lookup (cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:383
#9  0x000000000040193b in main (argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611

创建完成队列(ibv_create_cq)

代码语言:javascript
复制
创建完成队列:
#0  0x00007ffff65345c0 in ibv_create_cq () from /lib64/libibverbs.so.1
#1  0x00007ffff67694dc in uct_ib_mlx5_devx_open_device (ibv_device=ibv_device@entry=0x60de40) at mlx5/dv/ib_mlx5dv_md.c:947
#2  0x00007ffff676cfd8 in uct_ib_mlx5_devx_md_open (ibv_device=0x60de40, md_config=0x607750, p_md=0x7fffffffd5e0) at mlx5/dv/ib_mlx5dv_md.c:1111
#3  0x00007ffff6760d04 in uct_ib_md_open (component=<optimized out>, md_name=0x7fffffffd680 "mlx5_0", uct_md_config=0x607750, md_p=0x7fffffffd640) at base/ib_md.c:1051
#4  0x00007ffff792852d in uct_md_open (component=0x7ffff69faf40 <uct_ib_component>, md_name=0x7fffffffd680 "mlx5_0", config=<optimized out>, md_p=md_p@entry=0x7fffffffddc0) at base/uct_md.c:61
#5  0x0000000000402427 in dev_tl_lookup (cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:352
#6  0x000000000040193b in main (argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611
​
#0  0x00007ffff65345c0 in ibv_create_cq () from /lib64/libibverbs.so.1
#1  0x00007ffff6778a39 in uct_rc_verbs_can_create_qp (ctx=<optimized out>, pd=0x60d3f0) at rc/verbs/rc_verbs_iface.c:546
#2  0x00007ffff6778c23 in uct_rc_verbs_query_tl_devices (md=0x60efd0, tl_devices_p=0x7fffffffd630, num_tl_devices_p=0x7fffffffd620) at rc/verbs/rc_verbs_iface.c:581
#3  0x00007ffff792864f in uct_md_query_tl_resources (md=0x60efd0, resources_p=resources_p@entry=0x7fffffffd7b0, num_resources_p=num_resources_p@entry=0x7fffffffd790) at base/uct_md.c:94
#4  0x000000000040248c in dev_tl_lookup (cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:363
#5  0x000000000040193b in main (argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611
​
#0  0x00007ffff65347b0 in ibv_create_qp () from /lib64/libibverbs.so.1
#1  0x00007ffff675db21 in ibv_create_qp_ex (qp_init_attr_ex=0x7fffffffd558, context=<optimized out>) at /usr/include/infiniband/verbs.h:3016
#2  uct_ib_iface_create_qp (iface=iface@entry=0x6160f0, attr=attr@entry=0x7fffffffd520, qp_p=qp_p@entry=0x7fffffffd4e0) at base/ib_iface.c:1024
#3  0x00007ffff677238b in uct_rc_iface_qp_create (iface=iface@entry=0x6160f0, qp_p=qp_p@entry=0x7fffffffd4e0, attr=attr@entry=0x7fffffffd520, max_send_wr=<optimized out>, srq=<optimized out>)
    at rc/base/rc_iface.c:838
#4  0x00007ffff67794ed in uct_rc_verbs_iface_t_init (tl_config=0x60e130, params=<optimized out>, worker=<optimized out>, tl_md=<optimized out>, _init_count=0x7fffffffd4d0, 
    _myclass=0x7ffff69fc760 <uct_rc_verbs_iface_t_class>, self=0x6160f0) at rc/verbs/rc_verbs_iface.c:357
#5  uct_rc_verbs_iface_t_new (arg0=<optimized out>, arg1=<optimized out>, arg2=<optimized out>, arg3=0x60e130, obj_p=0x7fffffffdcd8) at rc/verbs/rc_verbs_iface.c:461
#6  0x00007ffff7928b5b in uct_iface_open (md=0x60efd0, worker=0x607560, params=params@entry=0x7fffffffd820, config=0x60e130, iface_p=iface_p@entry=0x7fffffffdcd8) at base/uct_md.c:250
#7  0x0000000000402738 in init_iface (iface_p=0x7fffffffdac0, func_am_type=FUNC_AM_SHORT, tl_name=0x616010 "rc_verbs", dev_name=0x61601a "mlx5_0:1") at uct_hello_world.c:271
#8  dev_tl_lookup (cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:383
#9  0x000000000040193b in main (argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611
​

处理异步事件的线程线程函数

代码语言:javascript
复制
处理异步事件的线程线程函数(调用栈)
ucs_async_thread_start
ucs_pthread_create(&thread->thread_id, ucs_async_thread_func, thread, "async"); 
  static void *ucs_async_thread_func(void *arg)
    status = ucs_event_set_wait(thread->event_set ucs_async_thread_ev_handler
      event_set_handler(events[i].data.ptr, io_events, arg)
        status = ucs_async_dispatch_handlers(&fd, 1, events)
          ucs_async_handler_dispatch(handler, events)
            ucs_async_handler_invoke(handler, events)
              handler->cb(handler->id, events, handler->arg)
                uct_ib_async_event_handler 异步事件处理
                  ibv_get_async_event(dev->ibv_context, &ibevent)
                  uct_ib_handle_async_event(dev, &event) -> 处理异步事件
                    switch (event->event_type)
                    ...
                    UCS_STATS_UPDATE_COUNTER(dev->stats, UCT_IB_DEVICE_STAT_ASYNC_EVENT, +1)
                  ibv_ack_async_event(&ibevent)

其他参考

UCT笔记(随时更新)

https://github.com/ssbandjl/ucx/blob/master/category/uct_readme

晓兵

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

DAOS汇总: https://cloud.tencent.com/developer/article/2344030

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 主流程(接上篇)
  • 启动服务端源码解读
  • 客户端流程
  • 零拷贝调用栈
  • 零拷贝内存示意图
  • 创建队列对调用栈(ibv_create_qp)
  • 创建完成队列(ibv_create_cq)
  • 处理异步事件的线程线程函数
  • 其他参考
    • UCT笔记(随时更新)
    • 晓兵
    相关产品与服务
    对象存储
    对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档