前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dpvs源码分析

dpvs源码分析

原创
作者头像
榴莲其实还可以
发布2018-08-08 12:51:58
4.7K1
发布2018-08-08 12:51:58
举报

前言

dpvs是爱奇艺开源的,它是一款基于dpdk的高性能4层负载均衡器。源自于LVS和改版后的alibaba/LVS. dpvs即dpdk-lvs. 等多关于dpvs的相关原理与特性请参考https://github.com/iqiyi/dpvs。本文主要是对dpvs的部分源码做剖析。

源码剖析

启动

代码语言:javascript
复制
int main(int argc, char *argv[])
{
    int err, nports;
    portid_t pid;
    struct netif_port *dev;
    struct timeval tv;
    char pql_conf_buf[LCORE_CONF_BUFFER_LEN];
    int pql_conf_buf_len = LCORE_CONF_BUFFER_LEN;
    uint32_t loop_cnt = 0;
    int timer_sched_loop_interval;

    /* check if dpvs is running and remove zombie pidfile */
    if (dpvs_running(DPVS_PIDFILE)) {
        fprintf(stderr, "dpvs is already running\n");
        exit(EXIT_FAILURE);
    }

    dpvs_state_set(DPVS_STATE_INIT);

    gettimeofday(&tv, NULL);
    srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());

    if (set_all_thread_affinity() != 0) {
        fprintf(stderr, "set_all_thread_affinity failed\n");
        exit(EXIT_FAILURE);
    }

	/*Initialize the Environment Abstraction Layer (EAL)*/
    err = rte_eal_init(argc, argv);
    if (err < 0)
        rte_exit(EXIT_FAILURE, "Invalid EAL parameters\n");
    argc -= err, argv += err;

    rte_timer_subsystem_init();

	/*注册了hup信号,初始化了3个list,try_reload加载配置文件dpvs.conf*/
    if ((err = cfgfile_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail init configuration file: %s\n",
                 dpvs_strerror(err));

	/*测试环境中dpdk就使用了eth0这个网卡,bonding技术应该没怎么用到*/
    if ((err = netif_virtual_devices_add()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail add virtual devices:%s\n",
                 dpvs_strerror(err));

	/*每个lcore一个timer*/
    if ((err = dpvs_timer_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail init timer on %s\n", dpvs_strerror(err));

	/*traffic controll 流量控制初始化*/
    if ((err = tc_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init traffic control: %s\n",
                 dpvs_strerror(err));

	/*netif_init->netif_lcore_init函数中会注册3个NETIF_LCORE_JOB_LOOP*/
	/*分别lcore_job_recv_fwd -> lcore_job_xmit -> lcore_job_timer_manage*/

    if ((err = netif_init(NULL)) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init netif: %s\n", dpvs_strerror(err));
    /* Default lcore conf and port conf are used and may be changed here 
     * with "netif_port_conf_update" and "netif_lcore_conf_set" */

	/*ctrl_init->msg_init也有一处NETIF_LCORE_JOB_LOOP注册*/
	/*slave_lcore_loop_func*/
    if ((err = ctrl_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init ctrl plane: %s\n",
                 dpvs_strerror(err));

	/*tc 控制平面的初始化,socket 注册*/
    if ((err = tc_ctrl_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init tc control plane: %s\n",
                 dpvs_strerror(err));

    if ((err = vlan_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init vlan: %s\n", dpvs_strerror(err));

	/*inet_init->ipv4_init->ipv4_frag_init有NETIF_LCORE_JOB_SLOW job注册*/
	/*inet_init -> neigh_init -> arp_init也有NETIF_LCORE_JOB_SLOW job注册*/
	/*分别为ipv4_frag_job -> neigh_process_ring*/
    if ((err = inet_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init inet: %s\n", dpvs_strerror(err));

	/*sa(socket addr)*/
    if ((err = sa_pool_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init sa_pool: %s\n", dpvs_strerror(err));

	/*dpvs的初始化,其中包括安装ipv4钩子 dp_vs_in和dp_vs_pre_routing */
    if ((err = dp_vs_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init ipvs: %s\n", dpvs_strerror(err));

    if ((err = netif_ctrl_init()) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "Fail to init netif_ctrl: %s\n",
                 dpvs_strerror(err));

	/*查找并获得这些个网卡设备*/
    /* config and start all available dpdk ports */
    nports = rte_eth_dev_count();
    for (pid = 0; pid < nports; pid++) {
        dev = netif_port_get(pid);
        if (!dev) {
            RTE_LOG(WARNING, DPVS, "port %d not found\n", pid);
            continue;
        }

        err = netif_port_start(dev);
        if (err != EDPVS_OK)
            RTE_LOG(WARNING, DPVS, "Start %s failed, skipping ...\n",
                    dev->name);
    }

    /* print port-queue-lcore relation */
    netif_print_lcore_conf(pql_conf_buf, &pql_conf_buf_len, true, 0);
    RTE_LOG(INFO, DPVS, "\nport-queue-lcore relation array: \n%s\n",
            pql_conf_buf);

	/*dataplane 数据平面线程,最终对调用到netif.c中的netif_loop()进行收发包处理
    /* start data plane threads */
    netif_lcore_start();

    /* write pid file */
    if (!pidfile_write(DPVS_PIDFILE, getpid()))
        goto end;

    timer_sched_loop_interval = dpvs_timer_sched_interval_get();
    assert(timer_sched_loop_interval > 0);

    dpvs_state_set(DPVS_STATE_NORMAL);

	/*控制平面线程*/
    /* start control plane thread */
    while (1) {
        /* reload configuations if reload flag is set */
        try_reload();
        /* IPC loop */
        sockopt_ctl(NULL);
        /* msg loop */
        msg_master_process();

        /* timer */
        loop_cnt++;
        if (loop_cnt % timer_sched_loop_interval == 0)
            rte_timer_manage();
        /* kni */
        kni_process_on_master();

        /* process mac ring on master */
        neigh_process_ring(NULL);

        dp_vs_service_auto_cleanup();
 
        /* increase loop counts */
        netif_update_master_loop_cnt();
    }

end:
    dpvs_state_set(DPVS_STATE_FINISH);
    if ((err = netif_ctrl_term()) !=0 )
        rte_exit(EXIT_FAILURE, "Fail to term netif_ctrl: %s\n",
                 dpvs_strerror(err));
    if ((err = dp_vs_term()) != EDPVS_OK)
        RTE_LOG(ERR, DPVS, "Fail to term ipvs: %s\n", dpvs_strerror(err));
    if ((err = sa_pool_term()) != EDPVS_OK)
        RTE_LOG(ERR, DPVS, "Fail to term sa_pool: %s\n", dpvs_strerror(err));
    if ((err = inet_term()) != EDPVS_OK)
        RTE_LOG(ERR, DPVS, "Fail to term inet: %s\n", dpvs_strerror(err));
    if ((err = dpvs_timer_term()) != EDPVS_OK)
        RTE_LOG(ERR, DPVS, "Fail to term timer: %s\n", dpvs_strerror(err));
    if ((err = ctrl_term()) != 0)
        RTE_LOG(ERR, DPVS, "Fail to term ctrl plane\n");
    if ((err = netif_term()) != 0)
        RTE_LOG(ERR, DPVS, "Fail to term route\n");
    if ((err = cfgfile_term()) != 0)
        RTE_LOG(ERR, DPVS, "Fail to term configuration file: %s\n",
                dpvs_strerror(err));
    pidfile_rm(DPVS_PIDFILE);

    exit(0);
}

先看main函数,main主要是做一些初始化操作,然后netif_lcore_start启动数据平面线程,while(1)循环开启控制平面线程。差不多每个初始化函数我都做了注释。其中有几个需要重点提示:

1,netif_init

该函数会调用到netif_lcore_init,在netif_lcore_init中会注册job处理函数

代码语言:javascript
复制
static void netif_lcore_init(void)
{
    int ii, res;
    lcoreid_t cid;

    .......................
    /* build lcore fast searching table */
    lcore_index_init();

    /* init isolate rxqueue table */
    isol_rxq_init();

    /* check and set lcore config */
    config_lcores(&worker_list);
    if ((res = check_lcore_conf(rte_lcore_count(), lcore_conf)) != EDPVS_OK)
        rte_exit(EXIT_FAILURE, "[%s] bad lcore configuration (err=%d),"
                " exit ...\n", __func__, res);

    /* build port fast searching table */
    port_index_init();

    /* register lcore jobs*/
    snprintf(netif_jobs[0].name, sizeof(netif_jobs[0].name) - 1, "%s", "recv_fwd");
    netif_jobs[0].func = lcore_job_recv_fwd;
    netif_jobs[0].data = NULL;
    netif_jobs[0].type = NETIF_LCORE_JOB_LOOP;
    snprintf(netif_jobs[1].name, sizeof(netif_jobs[1].name) - 1, "%s", "xmit");
    netif_jobs[1].func = lcore_job_xmit;
    netif_jobs[1].data = NULL;
    netif_jobs[1].type = NETIF_LCORE_JOB_LOOP;
    snprintf(netif_jobs[2].name, sizeof(netif_jobs[2].name) - 1, "%s", "timer_manage");
    netif_jobs[2].func = lcore_job_timer_manage;
    netif_jobs[2].data = NULL;
    netif_jobs[2].type = NETIF_LCORE_JOB_LOOP;

    for (ii = 0; ii < NETIF_JOB_COUNT; ii++) {
        res = netif_lcore_loop_job_register(&netif_jobs[ii]);
        if (res < 0) {
            rte_exit(EXIT_FAILURE, 
                    "[%s] Fail to register netif lcore jobs, exiting ...\n", __func__);
            break;
        }
    }
}

可以看到该函数注册了三个NETIF_LCORE_JOB_LOOP类型的job,注意他们的.fun域,后面都会调用到。

2,ctrl_init

该函数会里面会调用msg_init,在msg_init中也会注册一个NETIF_LCORE_JOB_LOOP类型的job

代码语言:javascript
复制
static inline int msg_init(void)
{
   
    ..............................
    netif_get_slave_lcores(&slave_lcore_nb, &slave_lcore_mask);
    /* multicast queue init */
    mc_wait_list.free_cnt = msg_mc_qlen;
    INIT_LIST_HEAD(&mc_wait_list.list);

    /* per-lcore msg queue */
    for (ii =0; ii < NETIF_MAX_LCORES; ii++) {
        snprintf(ring_name, sizeof(ring_name), "msg_ring_%d", ii);
        msg_ring[ii] = rte_ring_create(ring_name, msg_ring_size,
                rte_socket_id(), 0/*RING_F_SC_DEQ*/);
        if (unlikely(NULL == msg_ring[ii])) {
            RTE_LOG(ERR, MSGMGR, "Fail to init ctrl !\n");
                    return EDPVS_DPDKAPIFAIL;
        }
    }

    /* register netif-lcore-loop-job for Slaves */
    snprintf(ctrl_lcore_job.name, sizeof(ctrl_lcore_job.name) - 1, "%s", "slave_ctrl_plane");
    ctrl_lcore_job.func = slave_lcore_loop_func;
    ctrl_lcore_job.data = NULL;
    ctrl_lcore_job.type = NETIF_LCORE_JOB_LOOP;
    if ((ret = netif_lcore_loop_job_register(&ctrl_lcore_job)) < 0) {
        RTE_LOG(ERR, MSGMGR, "%s: fail to register ctrl func on slave lcores\n", __func__);
        return ret;
    }

    /* register built-in msg type */
    register_built_in_msg();
    msg_type_table_print(buf, sizeof(buf));
    RTE_LOG(INFO, MSGMGR, "%s: built-in msg registered:\n%s\n", __func__, buf);

    return EDPVS_OK;
}

3,inet_init

该函数里面会调用neigh_init和ipv4_init, 这两个函数又会分别调用arp_init和ipv4_frag_init,从而注册NETIF_LCORE_JOB_SLOW类型的job,具体代码我就不贴了。有兴趣的可以自己去跟踪下。

数据平面线程

上面之所以我会这么强调初始化里面的几个注册函数,主要是因为我们数据平面线程会一次调用到他们。数据平面线程由netif_lcore_start开始,但是实际的执行函数是netif_loop

代码语言:javascript
复制
static int netif_loop(void *dummy)
{
    struct netif_lcore_loop_job *job;
    lcoreid_t cid = rte_lcore_id();
#ifdef CONFIG_RECORD_BIG_LOOP
    char buf[512];
    uint32_t loop_time;
    uint64_t loop_start, loop_end;
#endif

    assert(LCORE_ID_ANY != cid && cid < NETIF_MAX_LCORES);

    try_isol_rxq_lcore_loop();
    if (0 == lcore_conf[lcore2index[cid]].nports) {
        RTE_LOG(INFO, NETIF, "[%s] Lcore %d has nothing to do.\n", __func__, cid);
        return EDPVS_IDLE;
    }

	/*NETIF_LCORE_JOB_INIT这个类型好像没见到有注册过*/
    list_for_each_entry(job, &netif_lcore_jobs[NETIF_LCORE_JOB_INIT], list) {
        do_lcore_job(job);
    }
    while (1) {
#ifdef CONFIG_RECORD_BIG_LOOP
        loop_start = rte_get_timer_cycles();
#endif
		/*依次处理之前的注册函数lcore_job_recv_fwd -> lcore_job_xmit -> lcore_job_timer_manage
		 *-> slave_lcore_loop_func
		 */
        lcore_stats[cid].lcore_loop++;
        list_for_each_entry(job, &netif_lcore_jobs[NETIF_LCORE_JOB_LOOP], list) {
            do_lcore_job(job);
        }
        ++netif_loop_tick[cid];
        list_for_each_entry(job, &netif_lcore_jobs[NETIF_LCORE_JOB_SLOW], list) {
            if (netif_loop_tick[cid] % job->skip_loops == 0) {
               
                //netif_loop_tick[cid] = 0;
            }
        }
#ifdef CONFIG_RECORD_BIG_LOOP
        loop_end = rte_get_timer_cycles();
        loop_time = (loop_end - loop_start) * 1E6 / cycles_per_sec;
        if (loop_time > longest_lcore_loop[cid]) {
            RTE_LOG(WARNING, NETIF, "update longest_lcore_loop[%d] = %d (<- %d)\n",
                    cid, loop_time, longest_lcore_loop[cid]);
            longest_lcore_loop[cid] = loop_time;
        }
        if (loop_time > BIG_LOOP_THRESH) {
            print_job_time(buf, sizeof(buf));
            RTE_LOG(WARNING, NETIF, "lcore[%d] loop over %d usecs (actual=%d, max=%d):\n%s\n",
                    cid, BIG_LOOP_THRESH, loop_time, longest_lcore_loop[cid], buf);
        }
#endif
    }
    return EDPVS_OK;
}

这个函数最核心的就是执行NETIF_LCORE_JOB_LOOP和NETIF_LCORE_JOB_SLOW类型的job,而这两个类型的job就是我们之前所说的,在几个初始化函数里面注册的。这里的list_for_each_entry是一个宏

代码语言:javascript
复制
#define list_for_each_entry(pos, head, member)				\
	for (pos = list_first_entry(head, typeof(*pos), member);	\
	     &pos->member != (head);					\
	     pos = list_next_entry(pos, member))

意思不难理解就是遍历链表,然后调用do_lcore_job(job). do_lcore_job()这个函数呢,那就更简单了

代码语言:javascript
复制
static inline void do_lcore_job(struct netif_lcore_loop_job *job)
{
#ifdef CONFIG_RECORD_BIG_LOOP
    uint64_t job_start, job_end;
    job_start = rte_get_timer_cycles();
#endif

    job->func(job->data);

#ifdef CONFIG_RECORD_BIG_LOOP
    job_end = rte_get_timer_cycles();
    job->job_time[rte_lcore_id()] = (job_end - job_start) * 1E6 / cycles_per_sec;
#endif
}

其核心就是job->func(job->data); 这个.fun前面提到过的,大体上现在就能理顺了。也就是之前的注册函数调用顺序应该是先调用NETIF_LCORE_JOB_LOOP类型的job, lcore_job_recv_fwd -> lcore_job_xmit -> lcore_job_timer_manage -> slave_lcore_loop_func 然后是两个NETIF_LCORE_JOB_SLOW类型的job,ipv4_frag_job -> neigh_process_ring。

结语

因为贴代码,所以文章显得比较长,在续篇中我们再分析下netif_loop 里面的这些job到底做了些什么。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 源码剖析
    • 启动
      • 数据平面线程
      • 结语
      相关产品与服务
      负载均衡
      负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档