前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Nvidia-NCCL-GPU集合通信接口简介_源码笔记

Nvidia-NCCL-GPU集合通信接口简介_源码笔记

原创
作者头像
晓兵
发布2023-11-24 23:03:06
1.7K0
发布2023-11-24 23:03:06
举报
文章被收录于专栏:daos

术语

nccl: NVIDIA Collective Communications Library (NCCL) 集合通信接口

常用链接

NCCL开发者文档: https://developer.nvidia.com/nccl

用户文档: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html

项目主页: https://github.com/NVIDIA/nccl

介绍

针对 GPU 间通信的优化原语

NCCL(发音为“Nickel”)是 GPU 标准通信例程的独立库,可实现全归约、全收集、归约、广播、归约分散以及任何基于发送/接收的通信模式。它经过优化,可在使用 PCIe、NVLink、NVswitch 的平台以及使用 InfiniBand Verbs 或 TCP/IP 套接字的网络上实现高带宽。NCCL 支持在单个节点或跨多个节点安装任意数量的 GPU,并且可用于单进程或多进程(例如 MPI)应用程序

源码阅读笔记

代码语言:javascript
复制
GIT仓库: https://github.com/ssbandjl/nccl.git

总结:
NCCL库用原生的RDMA的VERBS接口, 极简(相比UCX和Libfabric,或其他verbs通信库)的实现了单边读写和双边发送/接收通信语义, 以此来满足GPU间高性能和高带宽通信, 满足AI大模型训练中的数据传输

RDMA调用栈-QP建连:
ncclResult_t ncclIbCreateQp
    qpInitAttr.qp_type = IBV_QPT_RC
    qpInitAttr.cap.max_send_wr = 2*MAX_REQUESTS -> 128
    qpInitAttr.cap.max_recv_wr = MAX_REQUESTS -> 64
    wrap_ibv_modify_qp


RDMA单边写-调用栈/带立即数
ncclResult_t ncclIbIsend
    NCCLCHECK(ncclIbMultiSend(comm, slot))
ncclResult_t ncclIbMultiSend
    for (int r=0; r<nreqs; r++)
        struct ibv_sge* sge = comm->sges+r
        wr->opcode = IBV_WR_RDMA_WRITE
        wr->wr.rdma.remote_addr = slots[r].addr
        wr->next = wr+1
        wr_id += (reqs[r] - comm->verbs.reqs) << (r*8)
    immData |= (reqs[r]->send.size ? 1 : 0) << r -> 将数据大小写入立即数。 多个发送时,只写0或1作为size来表示是否有数据发送或接收
    lastWr->wr_id = wr_id
    lastWr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM
    lastWr->send_flags = IBV_SEND_SIGNALED
    NCCLCHECK(wrap_ibv_post_send(comm->qps[comm->qpIndex], comm->wrs, &bad_wr))
        qp->context->ops.post_send(qp, wr, bad_wr) -> ibv_post_send


main: 主函数
net_type: 网络类型
ncclNet_t* ncclNets[3] = { nullptr, &ncclNetIb, &ncclNetSocket };
src/transport/net_ib.cc

IB网卡实现的接口API
ncclNet_t ncclNetIb = {
  "IB",
  ncclIbInit,
  ncclIbDevices,
  ncclIbGetProperties,
  ncclIbListen,
  ncclIbConnect,
  ncclIbAccept,
  ncclIbRegMr,
  ncclIbRegMrDmaBuf,
  ncclIbDeregMr,
  ncclIbIsend,
  ncclIbIrecv,
  ncclIbIflush,
  ncclIbTest,
  ncclIbCloseSend,
  ncclIbCloseRecv,
  ncclIbCloseListen
};



网络支持的接口:
typedef struct {
  // Name of the network (mainly for logs)
  const char* name;
  // Initialize the network.
  ncclResult_t (*init)(ncclDebugLogger_t logFunction);
  // Return the number of adapters.
  ncclResult_t (*devices)(int* ndev);
  // Get various device properties.
  ncclResult_t (*getProperties)(int dev, ncclNetProperties_v6_t* props);
  // Create a receiving object and provide a handle to connect to it. The
  // handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged
  // between ranks to create a connection.
  ncclResult_t (*listen)(int dev, void* handle, void** listenComm);
  // Connect to a handle and return a sending comm object for that peer.
  // This call must not block for the connection to be established, and instead
  // should return successfully with sendComm == NULL with the expectation that
  // it will be called again until sendComm != NULL.
  ncclResult_t (*connect)(int dev, void* handle, void** sendComm);
  // Finalize connection establishment after remote peer has called connect.
  // This call must not block for the connection to be established, and instead
  // should return successfully with recvComm == NULL with the expectation that
  // it will be called again until recvComm != NULL.
  ncclResult_t (*accept)(void* listenComm, void** recvComm);
  // Register/Deregister memory. Comm can be either a sendComm or a recvComm.
  // Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
  ncclResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle);
  /* DMA-BUF support */
  ncclResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
  ncclResult_t (*deregMr)(void* comm, void* mhandle);
  // Asynchronous send to a peer.
  // May return request == NULL if the call cannot be performed (or would block)
  ncclResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request);
  // Asynchronous recv from a peer.
  // May return request == NULL if the call cannot be performed (or would block)
  ncclResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request);
  // Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is
  // visible to the GPU
  ncclResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request);
  // Test whether a request is complete. If size is not NULL, it returns the
  // number of bytes sent/received.
  ncclResult_t (*test)(void* request, int* done, int* sizes);
  // Close and free send/recv comm objects
  ncclResult_t (*closeSend)(void* sendComm);
  ncclResult_t (*closeRecv)(void* recvComm);
  ncclResult_t (*closeListen)(void* listenComm);
} ncclNet_v6_t;


ncclIbRegMr -> 内存注册
    ncclResult_t ncclIbRegMrDmaBuf
        flags = IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ
        NCCLCHECKGOTO(wrap_ibv_reg_mr_iova2(&mr, verbs->pd, (void*)addr, pages*pageSize, addr, flags), res, returning)
        NCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)addr, pages*pageSize, flags), res, returning)


下刷:
ncclResult_t ncclIbIflush(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request) {
    NCCLCHECK(wrap_ibv_poll_cq(r->verbs->cq, 4, wcs, &wrDone))



ncclNet_v4_as_v6_init


collNetRegMr

对外接口: src/include/coll_net.h
// Translation to external API
static const char* collNetName(struct ncclComm* comm) { return comm->ncclCollNet->name; }
static ncclResult_t collNetDevices(struct ncclComm* comm, int* ndev) { NCCLCHECK(comm->ncclCollNet->devices(ndev)); return ncclSuccess; }
static ncclResult_t collNetGetProperties(struct ncclComm* comm, int dev, ncclNetProperties_t* props) { NCCLCHECK(comm->ncclCollNet->getProperties(dev, props)); return ncclSuccess; }
static ncclResult_t collNetListen(struct ncclComm* comm, int dev, void* handle, void** listenComm) { NCCLCHECK(comm->ncclCollNet->listen(dev, handle, listenComm)); return ncclSuccess; }
static ncclResult_t collNetConnect(struct ncclComm* comm, void* handles[], int nranks, int rank, void* listenComm, void** collComm) { NCCLCHECK(comm->ncclCollNet->connect(handles, nranks, rank, listenComm, collComm)); return ncclSuccess; }
static ncclResult_t collNetReduceSupport(struct ncclComm* comm, ncclDataType_t dataType, ncclRedOp_t redOp, int* supported) { NCCLCHECK(comm->ncclCollNet->reduceSupport(dataType, redOp, supported)); return ncclSuccess; }
static ncclResult_t collNetRegMr(struct ncclComm* comm, void* collComm, void* data, int size, int type, void** mhandle) { NCCLCHECK(comm->ncclCollNet->regMr(collComm, data, size, type, mhandle)); return ncclSuccess; }
/* DMA-BUF support */
static ncclResult_t collNetRegMrDmaBuf(struct ncclComm* comm, void* collComm, void* data, int size, int type, uint64_t offset, int fd, void** mhandle) { NCCLCHECK(comm->ncclCollNet->regMrDmaBuf(collComm, data, size, type, offset, fd, mhandle)); return ncclSuccess; }
static ncclResult_t collNetDeregMr(struct ncclComm* comm, void* collComm, void* mhandle) { NCCLCHECK(comm->ncclCollNet->deregMr(collComm, mhandle)); return ncclSuccess; }
static ncclResult_t collNetIallreduce(struct ncclComm* comm, void* collComm, void* sendData, void* recvData, int count, ncclDataType_t dataType, ncclRedOp_t redOp, void* sendMhandle, void* recvMhandle,  void** request) {
  NCCLCHECK(comm->ncclCollNet->iallreduce(collComm, sendData, recvData, count, dataType, redOp, sendMhandle, recvMhandle, request)); return ncclSuccess; }
static ncclResult_t collNetIflush(struct ncclComm* comm, void* collComm, void* data, int size, void* mhandle, void** request) { NCCLCHECK(comm->ncclCollNet->iflush(collComm, data, size, mhandle, request)); return ncclSuccess; }
static ncclResult_t collNetTest(struct ncclComm* comm, void* request, int* done, int* size) { NCCLCHECK(comm->ncclCollNet->test(request, done, size)); return ncclSuccess; }
static ncclResult_t collNetCloseColl(struct ncclComm* comm, void* collComm) { NCCLCHECK(comm->ncclCollNet->closeColl(collComm)); return ncclSuccess; }
static ncclResult_t collNetCloseListen(struct ncclComm* comm, void* listenComm) { NCCLCHECK(comm->ncclCollNet->closeListen(listenComm)); return ncclSuccess; }

static int collNetSupport(struct ncclComm* comm) { return comm->ncclCollNet != nullptr ? 1 : 0; }





struct ncclTransport collNetTransport = {
  "COL",
  canConnect,
  { sendSetup, sendConnect, sendFree, NULL, sendProxySetup, sendProxyConnect, sendProxyFree, sendProxyProgress },
  { recvSetup, recvConnect, recvFree, NULL, recvProxySetup, recvProxyConnect, recvProxyFree, recvProxyProgress }
};



对齐和常用数学库:
#define DIVUP(x, y) \
    (((x)+(y)-1)/(y))

#define ROUNDUP(x, y) \
    (DIVUP((x), (y))*(y))

#define ALIGN_POWER(x, y) \
    ((x) > (y) ? ROUNDUP(x, y) : ((y)/((y)/(x))))

#define ALIGN_SIZE(size, align) \
  size = ((size + (align) - 1) / (align)) * (align);

#if !__CUDA_ARCH__
  #ifndef __host__
    #define __host__
  #endif
  #ifndef __device__
    #define __device__
  #endif
#endif

template<typename X, typename Y, typename Z = decltype(X()+Y())>
__host__ __device__ constexpr Z divUp(X x, Y y) {
  return (x+y-1)/y;
}

template<typename X, typename Y, typename Z = decltype(X()+Y())>
__host__ __device__ constexpr Z roundUp(X x, Y y) {
  return (x+y-1) - (x+y-1)%y;
}

// assumes second argument is a power of 2
template<typename X, typename Z = decltype(X()+int())>
__host__ __device__ constexpr Z alignUp(X x, int a) {
  return (x+a-1) & Z(-a);
}



公共状态:
enum ncclIbCommState {
  ncclIbCommStateStart = 0,
  ncclIbCommStateConnect = 1,
  ncclIbCommStateAccept = 3,
  ncclIbCommStateSend = 4,
  ncclIbCommStateRecv = 5,
  ncclIbCommStateConnecting = 6,
  ncclIbCommStateConnected = 7,
  ncclIbCommStatePendingReady = 8,
};



ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm)
    if (stage->state == ncclIbCommStateSend)       goto ib_send;
    NCCLCHECK(ncclIbInitVerbs(dev, ctx, &comm->verbs))



初始化
init()
编译时加载:
ncclResult_t wrap_ibv_symbols(void) {
  pthread_once(&initOnceControl,
               [](){ initResult = buildIbvSymbols(&ibvSymbols); });
  return initResult;
}
ncclResult_t buildIbvSymbols(struct ncclIbvSymbols* ibvSymbols)
    ibvhandle=dlopen("libibverbs.so", RTLD_NOW)


ibv接口, verbs, 包装, 符号表
LOAD_SYM(ibvhandle, "ibv_get_device_list", ibvSymbols->ibv_internal_get_device_list);
LOAD_SYM(ibvhandle, "ibv_free_device_list", ibvSymbols->ibv_internal_free_device_list);
LOAD_SYM(ibvhandle, "ibv_get_device_name", ibvSymbols->ibv_internal_get_device_name);
LOAD_SYM(ibvhandle, "ibv_open_device", ibvSymbols->ibv_internal_open_device);
LOAD_SYM(ibvhandle, "ibv_close_device", ibvSymbols->ibv_internal_close_device);
LOAD_SYM(ibvhandle, "ibv_get_async_event", ibvSymbols->ibv_internal_get_async_event);
LOAD_SYM(ibvhandle, "ibv_ack_async_event", ibvSymbols->ibv_internal_ack_async_event);
LOAD_SYM(ibvhandle, "ibv_query_device", ibvSymbols->ibv_internal_query_device);
LOAD_SYM(ibvhandle, "ibv_query_port", ibvSymbols->ibv_internal_query_port);
LOAD_SYM(ibvhandle, "ibv_query_gid", ibvSymbols->ibv_internal_query_gid);
LOAD_SYM(ibvhandle, "ibv_query_qp", ibvSymbols->ibv_internal_query_qp);
LOAD_SYM(ibvhandle, "ibv_alloc_pd", ibvSymbols->ibv_internal_alloc_pd);
LOAD_SYM(ibvhandle, "ibv_dealloc_pd", ibvSymbols->ibv_internal_dealloc_pd);
LOAD_SYM(ibvhandle, "ibv_reg_mr", ibvSymbols->ibv_internal_reg_mr);
// Cherry-pick the ibv_reg_mr_iova2 API from IBVERBS 1.8
LOAD_SYM_VERSION(ibvhandle, "ibv_reg_mr_iova2", ibvSymbols->ibv_internal_reg_mr_iova2, "IBVERBS_1.8");
// Cherry-pick the ibv_reg_dmabuf_mr API from IBVERBS 1.12
LOAD_SYM_VERSION(ibvhandle, "ibv_reg_dmabuf_mr", ibvSymbols->ibv_internal_reg_dmabuf_mr, "IBVERBS_1.12");
LOAD_SYM(ibvhandle, "ibv_dereg_mr", ibvSymbols->ibv_internal_dereg_mr);
LOAD_SYM(ibvhandle, "ibv_create_cq", ibvSymbols->ibv_internal_create_cq);
LOAD_SYM(ibvhandle, "ibv_destroy_cq", ibvSymbols->ibv_internal_destroy_cq);
LOAD_SYM(ibvhandle, "ibv_create_qp", ibvSymbols->ibv_internal_create_qp);
LOAD_SYM(ibvhandle, "ibv_modify_qp", ibvSymbols->ibv_internal_modify_qp);
LOAD_SYM(ibvhandle, "ibv_destroy_qp", ibvSymbols->ibv_internal_destroy_qp);
LOAD_SYM(ibvhandle, "ibv_fork_init", ibvSymbols->ibv_internal_fork_init); <- wrap_ibv_fork_init
LOAD_SYM(ibvhandle, "ibv_event_type_str", ibvSymbols->ibv_internal_event_type_str);

RDMA网卡带宽:
static int ibvSpeeds[] = {
  2500,  /* SDR */
  5000,  /* DDR */
  10000, /* QDR */
  10000, /* QDR */
  14000, /* FDR */
  25000, /* EDR */
  50000, /* HDR */
  100000 /* NDR */ };


计算带宽的公式:
ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);


IB异步主线程:
static void* ncclIbAsyncThreadMain(void* args)
    wrap_ibv_event_type_str(&str, event.event_type)) -> 事件转字符串
    wrap_ibv_ack_async_event(&event)

总结

NCCL库用原生的RDMA的VERBS接口,极简(相比UCX和Libfabric,或其他verbs通信库)的实现了单边读写和双边发送/接收通信语义, 以此来满足GPU间高性能和高带宽通信, 满足AI大模型训练中的数据传输

晓兵(ssbandjl)

博客: https://cloud.tencent.com/developer/user/5060293/articles | https://logread.cn | https://blog.csdn.net/ssbandjl

DAOS汇总: https://cloud.tencent.com/developer/article/2344030

晓兵技术杂谈(系列: DAOS/RDMA/UCX/Mercury/Libfabric/分布式存储等)

视频: https://cloud.tencent.com/developer/user/5060293/video

博客: https://cloud.tencent.com/developer/column/99669

欢迎对DAOS, SPDK, RDMA, 协程等高性能技术感兴趣的朋友加入DAOS技术交流(群)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 术语
  • 常用链接
  • 介绍
  • 源码阅读笔记
  • 总结
  • 晓兵(ssbandjl)
    • 晓兵技术杂谈(系列: DAOS/RDMA/UCX/Mercury/Libfabric/分布式存储等)
    相关产品与服务
    对象存储
    对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档