前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Librdkafka的Transport层

Librdkafka的Transport层

作者头像
扫帚的影子
发布2018-09-05 16:47:19
1.4K0
发布2018-09-05 16:47:19
举报
文章被收录于专栏:分布式系统进阶
  • Librdkafka要和kakfa集群通讯, 网络操作肯定是少不了的,这就需要封装transport数据传输层;
  • Librdkafka毕竟是SDK, 作为访问kafka集群的客户端,不需要支持大并发, 在网络IO模型 上选用了 poll;
  • IO模型确定后, 发送和接收数据必不可少的缓冲区buffer, 我们前面已经介绍过, 请参考Librdkafka的基础数据结构 3 -- Buffer相关 ;
  • 以上介绍的librdkafka中的poll模型和buffer, 完全可以独立出来, 用在其他项目上, 作者封装得很好;
  • Librdkafka与kafka broker间是tcp连接, 在接收数据后就涉及到一个拆包组包的问题, 这个就和kafka的协议有关了, kafka的二进制协议:
代码语言:txt
复制
1. 前4个字节是payload长度;
2. 后面紧接着是payload具体内容,这部分又分为header和body两部分;收包时就可以先收4字节,拿到payload长度, 再根据这个长度收够payload内容, 这样一个完整的response就接收到了;下面我们来结合代码分析一下,  其中有一部分是windows下的实现代码,我们将忽略掉

rd_kafka_transport_s
  • 所在文件: src/rdkafka_transport_int.h srd/rdkafka_transport.c(.h)
  • 定义:
代码语言:javascript
复制
struct rd_kafka_transport_s {   
    int rktrans_s; // 与broker通讯的socket
    
    rd_kafka_broker_t *rktrans_rkb; // 与之通讯的broker, 一个broker一个tcp连接,也就对应着一个transport对象

#if WITH_SSL
    SSL *rktrans_ssl; //ssl连接句柄
#endif

        // sasl权限验证相关
    struct {
                void *state;               /* SASL implementation
                                            * state handle */

                int           complete;    /* Auth was completed early
                        * from the client's perspective
                        * (but we might still have to
                                            *  wait for server reply). */

                /* SASL framing buffers */
        struct msghdr msg;
        struct iovec  iov[2];

        char          *recv_buf;
        int            recv_of;    /* Received byte count */
        int            recv_len;   /* Expected receive length for
                        * current frame. */
    } rktrans_sasl;

        //  response接收buffer
    rd_kafka_buf_t *rktrans_recv_buf;  /* Used with framed_recvmsg */

        // poll模型使用
        /* Two pollable fds:
         * - TCP socket
         * - wake-up fd
         */
        struct pollfd rktrans_pfd[2];
        int rktrans_pfd_cnt;

        size_t rktrans_rcvbuf_size;    /**< Socket receive buffer size */
        size_t rktrans_sndbuf_size;    /**< Socket send buffer size */
};

typedef struct rd_kafka_transport_s rd_kafka_transport_t
  • 异步连接到broker rd_kafka_transport_connect:
代码语言:javascript
复制
rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
                          const rd_sockaddr_inx_t *sinx,
                          char *errstr,
                          size_t errstr_size) {
    rd_kafka_transport_t *rktrans;
    int s = -1;
    int on = 1;
        int r;

        rkb->rkb_addr_last = sinx;

        // 建socket, 可以调用socket_cb, 用用户自定义的方式来得到socket, 默认用socket函数
    s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,
                       SOCK_STREAM, IPPROTO_TCP,
                       rkb->rkb_rk->rk_conf.opaque);
    if (s == -1) {
        rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
                socket_strerror(socket_errno));
        return NULL;
    }


#ifdef SO_NOSIGPIPE
    /* Disable SIGPIPE signalling for this socket on OSX */
    if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) 
        rd_rkb_dbg(rkb, BROKER, "SOCKET",
               "Failed to set SO_NOSIGPIPE: %s",
               socket_strerror(socket_errno));
#endif

    /* Enable TCP keep-alives, if configured. */
    if (rkb->rkb_rk->rk_conf.socket_keepalive) {
#ifdef SO_KEEPALIVE
        if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE,
                   (void *)&on, sizeof(on)) == SOCKET_ERROR)
            rd_rkb_dbg(rkb, BROKER, "SOCKET",
                   "Failed to set SO_KEEPALIVE: %s",
                   socket_strerror(socket_errno));
#else
        rd_rkb_dbg(rkb, BROKER, "SOCKET",
               "System does not support "
               "socket.keepalive.enable (SO_KEEPALIVE)");
#endif
    }

        /* Set the socket to non-blocking */
        if ((r = rd_fd_set_nonblocking(s))) {
                rd_snprintf(errstr, errstr_size,
                            "Failed to set socket non-blocking: %s",
                            socket_strerror(r));
                goto err;
        }

    /* Connect to broker */
       // 可以用用户自定义的连接方式connect, 回调没set的话调用connect
        if (rkb->rkb_rk->rk_conf.connect_cb) {
                r = rkb->rkb_rk->rk_conf.connect_cb(
                        s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
                        rkb->rkb_name, rkb->rkb_rk->rk_conf.opaque);
        } else {
                if (connect(s, (struct sockaddr *)sinx,
                            RD_SOCKADDR_INX_LEN(sinx)) == SOCKET_ERROR &&
                    (socket_errno != EINPROGRESS
                            ))
                        r = socket_errno;
                else
                        r = 0;
        }

        if (r != 0) {
        rd_snprintf(errstr, errstr_size,
                "Failed to connect to broker at %s: %s",
                rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
                socket_strerror(r));
        goto err;
    }

    /* Create transport handle */
        // 创建并初始化rd_kafka_transport_s
    rktrans = rd_calloc(1, sizeof(*rktrans));
    rktrans->rktrans_rkb = rkb;
    rktrans->rktrans_s = s;
    rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;

        // 添加poll事件
        if (rkb->rkb_wakeup_fd[0] != -1) {
                rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
                rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];
        }

    /* Poll writability to trigger on connection success/failure. */
    rd_kafka_transport_poll_set(rktrans, POLLOUT);

    return rktrans;

 err:
    if (s != -1)
                rd_kafka_transport_close0(rkb->rkb_rk, s);

    return NULL;
}
  • 对 poll方法的封装 rd_kafka_transport_poll:
代码语言:javascript
复制
int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) {
        int r;
        // 调用poll, 失败直接return
    r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
    if (r <= 0)
        return r;

        rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);
        // 没什么实际用处
        if (rktrans->rktrans_pfd[1].revents & POLLIN) {
                /* Read wake-up fd data and throw away, just used for wake-ups*/
                char buf[512];
                if (rd_read((int)rktrans->rktrans_pfd[1].fd,
                            buf, sizeof(buf)) == -1) {
                        /* Ignore warning */
                }
        }
         // 返回poll到的有效事件
         return rktrans->rktrans_pfd[0].revents;
}
  • transport io 事件循环rd_kafka_transport_io_serve:
代码语言:javascript
复制
void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
                                  int timeout_ms) {
    rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
    int events;

        // 如果没收到response的请求个数没超过最大限制, 并且有需要发送的buf, 就把POLLOUT事件加入
    if (rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
        rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0)
        rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT);

        // poll啊poll
    if ((events = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0)
                return;
        // 暂时去掉POLLOUT
        rd_kafka_transport_poll_clear(rktrans, POLLOUT);

         //  处理poll到的io events
    rd_kafka_transport_io_event(rktrans, events);
}
  • IO事件处理 rd_kafka_transport_io_event: broker有一个简单的状态机转换,有如下几个状态:
代码语言:javascript
复制
enum {
        RD_KAFKA_BROKER_STATE_INIT,
        RD_KAFKA_BROKER_STATE_DOWN,
        RD_KAFKA_BROKER_STATE_CONNECT,
        RD_KAFKA_BROKER_STATE_AUTH,

        /* Any state >= STATE_UP means the Kafka protocol layer
         * is operational (to some degree). */
        RD_KAFKA_BROKER_STATE_UP,
                RD_KAFKA_BROKER_STATE_UPDATE,
        RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
        RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
    } rkb_state;

根据这个broker当前的状态,有不同的操作和状态转换

代码语言:javascript
复制
static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans,
                     int events) {
    char errstr[512];
    int r;
    rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;

    switch (rkb->rkb_state)
    {
    case RD_KAFKA_BROKER_STATE_CONNECT:
#if WITH_SSL
        if (rktrans->rktrans_ssl) {
            /* Currently setting up SSL connection:
             * perform handshake. */
            rd_kafka_transport_ssl_handshake(rktrans);
            return;
        }
#endif

        /* Asynchronous connect finished, read status. */
        if (!(events & (POLLOUT|POLLERR|POLLHUP)))
            return;

        if (rd_kafka_transport_get_socket_error(rktrans, &r) == -1) {
            rd_kafka_broker_fail(
                                rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
                                "Connect to %s failed: "
                                "unable to get status from "
                                "socket %d: %s",
                                rd_sockaddr2str(rkb->rkb_addr_last,
                                                RD_SOCKADDR2STR_F_PORT |
                                                RD_SOCKADDR2STR_F_FAMILY),
                                rktrans->rktrans_s,
                                rd_strerror(socket_errno));
        } else if (r != 0) {
            /* Connect failed */
                        errno = r;
            rd_snprintf(errstr, sizeof(errstr),
                    "Connect to %s failed: %s",
                                    rd_sockaddr2str(rkb->rkb_addr_last,
                                                    RD_SOCKADDR2STR_F_PORT |
                                                    RD_SOCKADDR2STR_F_FAMILY),
                                    rd_strerror(r));

            rd_kafka_transport_connect_done(rktrans, errstr);
        } else {
            /* Connect succeeded */
            rd_kafka_transport_connected(rktrans);
        }
        break;

    case RD_KAFKA_BROKER_STATE_AUTH:
        /* SASL handshake */
        if (rd_kafka_sasl_io_event(rktrans, events,
                       errstr, sizeof(errstr)) == -1) {
            errno = EINVAL;
            rd_kafka_broker_fail(rkb, LOG_ERR,
                         RD_KAFKA_RESP_ERR__AUTHENTICATION,
                         "SASL authentication failure: %s",
                         errstr);
            return;
        }
        break;

    case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
    case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
    case RD_KAFKA_BROKER_STATE_UP:
    case RD_KAFKA_BROKER_STATE_UPDATE:

        if (events & POLLIN) {
            while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
                   rd_kafka_recv(rkb) > 0)
                ;
        }

        if (events & POLLHUP) {
            rd_kafka_broker_fail(rkb,
                                             rkb->rkb_rk->rk_conf.
                                             log_connection_close ?
                                             LOG_NOTICE : LOG_DEBUG,
                                             RD_KAFKA_RESP_ERR__TRANSPORT,
                         "Connection closed");
            return;
        }

        if (events & POLLOUT) {
            while (rd_kafka_send(rkb) > 0)
                ;
        }
        break;

    case RD_KAFKA_BROKER_STATE_INIT:
    case RD_KAFKA_BROKER_STATE_DOWN:
        rd_kafka_assert(rkb->rkb_rk, !*"bad state");
    }
}

最主要的是以下两点:

  1. 有可读事件:
代码语言:javascript
复制
循环读, 读到不能读为止
if (events & POLLIN) {
            while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
                   rd_kafka_recv(rkb) > 0)
                ;
        }

rd_kafka_recv按kafka的协议来收包, 先收4字节,拿到payload长度, 再根据这个长度收够payload内容, 这样一个完整的response就接收到了

  1. 有可写事件:
代码语言:javascript
复制
if (events & POLLOUT) {
            while (rd_kafka_send(rkb) > 0)
                ;
        }

循环写, 写到不能写为止

  • 关闭transport
代码语言:javascript
复制
void rd_kafka_transport_close (rd_kafka_transport_t *rktrans) {
#if WITH_SSL
    if (rktrans->rktrans_ssl) {
        SSL_shutdown(rktrans->rktrans_ssl);
        SSL_free(rktrans->rktrans_ssl);
    }
#endif

        rd_kafka_sasl_close(rktrans);

    if (rktrans->rktrans_recv_buf)
        rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);

    if (rktrans->rktrans_s != -1)
                rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,
                                          rktrans->rktrans_s);

    rd_free(rktrans);
}
  • 封装sendmsg方法
代码语言:javascript
复制
ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
                                           rd_slice_t *slice,
                                           char *errstr, size_t errstr_size) {
        struct iovec iov[IOV_MAX];
        struct msghdr msg = { .msg_iov = iov };
        size_t iovlen;
        ssize_t r;

       // 从slice产生iovec数组, 然后再发送
        rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX,
                         /* FIXME: Measure the effects of this */
                         rktrans->rktrans_sndbuf_size);
        msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen;

#ifdef sun
        /* See recvmsg() comment. Setting it here to be safe. */
        socket_errno = EAGAIN;
#endif

        r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT
#ifdef MSG_NOSIGNAL
                    | MSG_NOSIGNAL
#endif
                );

        if (r == -1) {
                if (socket_errno == EAGAIN)
                        return 0;
                rd_snprintf(errstr, errstr_size, "%s", rd_strerror(errno));
        }

        /* Update buffer read position */
        rd_slice_read(slice, NULL, (size_t)r);

        return r;
}
  • 封装 send方法:
代码语言:javascript
复制
rd_kafka_transport_socket_send0 (rd_kafka_transport_t *rktrans,
                                 rd_slice_t *slice,
                                 char *errstr, size_t errstr_size) {
        ssize_t sum = 0;
        const void *p;
        size_t rlen;

       // rd_slice_peeker相当于一个iterator接口,每次返回一个slice中有效的segment
        while ((rlen = rd_slice_peeker(slice, &p))) {
                ssize_t r;
                //实际的发送
                r = send(rktrans->rktrans_s, p, rlen, 0
                );

                // 如果非eagain错误的话,是真的有错误发生了,return -1
                if (unlikely(r <= 0)) {
                        if (r == 0 || errno == EAGAIN)
                                return 0;
                        rd_snprintf(errstr, errstr_size, "%s",
                                    socket_strerror(socket_errno));
                        return -1;
                }

                /* Update buffer read position */
               //更新slice的读位置 
                rd_slice_read(slice, NULL, (size_t)r);

                sum += r;

                /* FIXME: remove this and try again immediately and let
                 *        the next write() call fail instead? */
                if ((size_t)r < rlen)
                        break;
        }

        return sum;
}
  • 封装 recv方法:
代码语言:javascript
复制
rd_kafka_transport_socket_recv0 (rd_kafka_transport_t *rktrans,
                                 rd_buf_t *rbuf,
                                 char *errstr, size_t errstr_size) {
        ssize_t sum = 0;
        void *p;
        size_t len;
        
       // rbuf可写就一直写
        while ((len = rd_buf_get_writable(rbuf, &p))) {
                ssize_t r;

                r = recv(rktrans->rktrans_s, p,
                         len,
                         0);

                if (unlikely(r <= 0)) {
                        if (r == -1 && socket_errno == EAGAIN)
                                return 0;
                        else if (r == 0) {
                                /* Receive 0 after POLLIN event means
                                 * connection closed. */
                                rd_snprintf(errstr, errstr_size,
                                            "Disconnected");
                                return -1;
                        } else if (r == -1) {
                                rd_snprintf(errstr, errstr_size, "%s",
                                            rd_strerror(errno));
                                return -1;
                        }
                }

                /* Update buffer write position */
                // 更新buf的写位置
                rd_buf_write(rbuf, NULL, (size_t)r);

                sum += r;

                /* FIXME: remove this and try again immediately and let
                 *        the next recv() call fail instead? */
                if ((size_t)r < len)
                        break;
        }
        return sum;
}

Librdkafka源码分析-Content Table

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.01.17 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Librdkafka源码分析-Content Table
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档