前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >虚拟化iothread特性

虚拟化iothread特性

原创
作者头像
sidzhan
修改2019-07-31 18:01:58
6K0
修改2019-07-31 18:01:58
举报

背景

在现代虚拟化大环境下,主机逐渐向多核多磁盘高性能计算机发展,为了更好的利用多CPU并行能力,磁盘的高速读写能力,如何使虚拟机更好的使用宿舍主机的硬件资源,成了一个不变的话题。性能优化需要更好的在QEMU-KVM虚拟化技术下达到资源隔离,线程专用。本文仅以Hypervisor为QEMU-KVM、libvrit软件集以及操作系统为centos7.2作为分析的前提。

Libvirt下专用I/O线程配置

Libvirt管理下的qemu-kvm虚拟机中配置xml文件的iothread属性,是qemu新版本下自带的功能配置,附属的native特性也是为了配合打开iothread特性。其具体的使用参考libvirt官方文档

代码语言:html
复制
<domain>
 	 ...
  	<iothreads>4</iothreads>
  ...
</domain>
<domain>
  ...
  <iothreadids>
  	  <iothread id="2"/>
  	  <iothread id="4"/>
 	    <iothread id="6"/>
  	  <iothread id="8"/>
  </iothreadids>
  ...
</domain>

iothreads:分配给域以供受支持的目标存储设备使用的IOThread数,每个主机CPU应该只有1或2个IOThread,每个IOThread可分配多个设备。

iothreadids:提供了专门为域定义IOThread ID的功能。

iothread架构是什么

早先的qemu版本,只存在一个主线程,同时负载客户虚拟机的指令执行和运行事件循两个任务。线程执行客户机指令时,通过异常产生和信号量机制收走qemu线程控制权。接着通过执行非阻塞的select(2)进行一次循环的迭代,之后就返回客户机指令的执行,并不停重复以上过程直到QEMU关闭。这样的架构被称为non-iothread架构。它会存在着诸多问题,例如不能利用宿主机的多核能力、在运行SMP客户机的情况下会表现不佳、无法同时异步执行多个事件处理等。

之后qemu在新版本中变换了新的架构,为每一个vCPU分配一个QEMU线程,以及一个专用的事件处理循环线程。这个模型称为iothread。各个vCPU线程可以并行的执行客户机指令,进而提供真正的SMP支持;iothread则负责运行事件处理循环。通过使用了一个全局的mutex互斥锁来维持线程同步。大多数时间里,vCPU在运行客户机指令,iothread则阻塞在select(2)上。这样使得IO处理能够完全脱离主线程,跑在多个不同的线程里面,充分利用现代多核处理器的能力。

iothread线程.png
iothread线程.png

因此libvirt中配置的iothread属性其实也就是为不同的I/O事件起了专门的I/O处理线程,从上图可以看出,iothread线程代表虚拟机向主机生成I / O请求并处理事件,有一个I / O线程运行select(2)循环来处理事件,运行在Qemu主线程循环中。

定义进程时

通过iothread_info结构->iothread_class_init()->iothread_complete()->iothread_run()

代码语言:javascript
复制
/*获得数据,判断是否从aio内容中索取*/
AioContext *qemu_get_current_aio_context(void)
{
    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
}

/*iothread运行函数,轮询epoll*/
static void *iothread_run(void *opaque)
{
    IOThread *iothread = opaque;

    rcu_register_thread();

    my_iothread = iothread;
    qemu_mutex_lock(&iothread->init_done_lock);
    iothread->thread_id = qemu_get_thread_id();
    qemu_cond_signal(&iothread->init_done_cond);
    qemu_mutex_unlock(&iothread->init_done_lock);
    
    /*只要AIO thread没有被停掉,线程就会一直被epoll*/
    while (!atomic_read(&iothread->stopping)) {
        aio_poll(iothread->ctx, true);

        if (atomic_read(&iothread->worker_context)) {
            GMainLoop *loop;

            g_main_context_push_thread_default(iothread->worker_context);
            /*定义主循环函数*/
            iothread->main_loop =
                g_main_loop_new(iothread->worker_context, TRUE);
            loop = iothread->main_loop;

            g_main_loop_run(iothread->main_loop);
            iothread->main_loop = NULL;
            g_main_loop_unref(loop);

            g_main_context_pop_thread_default(iothread->worker_context);
        }
    }

    rcu_unregister_thread();
    return NULL;
}

/*停止线程*/
void iothread_stop(IOThread *iothread)
{
    if (!iothread->ctx || iothread->stopping) {
        return;
    }
    iothread->stopping = true;
    aio_notify(iothread->ctx);
    if (atomic_read(&iothread->main_loop)) {
        g_main_loop_quit(iothread->main_loop);
    }
    qemu_thread_join(&iothread->thread);
}

static int iothread_stop_iter(Object *object, void *opaque)
{
    IOThread *iothread;

    iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
    if (!iothread) {
        return 0;
    }
    iothread_stop(iothread);
    return 0;
}

static void iothread_instance_init(Object *obj)
{
    IOThread *iothread = IOTHREAD(obj);

    iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
}

static void iothread_instance_finalize(Object *obj)
{
    IOThread *iothread = IOTHREAD(obj);

    iothread_stop(iothread);
    if (iothread->worker_context) {
        g_main_context_unref(iothread->worker_context);
        iothread->worker_context = NULL;
    }
    qemu_cond_destroy(&iothread->init_done_cond);
    qemu_mutex_destroy(&iothread->init_done_lock);
    if (!iothread->ctx) {
        return;
    }
    aio_context_unref(iothread->ctx);
}

/*核心函数,创建了iothread_run线程*/
static void iothread_complete(UserCreatable *obj, Error **errp)
{
    Error *local_error = NULL;
    IOThread *iothread = IOTHREAD(obj);
    char *name, *thread_name;

    iothread->stopping = false;
    iothread->thread_id = -1;
    iothread->ctx = aio_context_new(&local_error);
    if (!iothread->ctx) {
        error_propagate(errp, local_error);
        return;
    }

    aio_context_set_poll_params(iothread->ctx,
                                iothread->poll_max_ns,
                                iothread->poll_grow,
                                iothread->poll_shrink,
                                &local_error);
    if (local_error) {
        error_propagate(errp, local_error);
        aio_context_unref(iothread->ctx);
        iothread->ctx = NULL;
        return;
    }

    qemu_mutex_init(&iothread->init_done_lock);
    qemu_cond_init(&iothread->init_done_cond);
    iothread->once = (GOnce) G_ONCE_INIT;

    /* This assumes we are called from a thread with useful CPU affinity for us
     * to inherit.
     */
    name = object_get_canonical_path_component(OBJECT(obj));
    thread_name = g_strdup_printf("IO %s", name);
    qemu_thread_create(&iothread->thread, thread_name, iothread_run,
                       iothread, QEMU_THREAD_JOINABLE);
    g_free(thread_name);
    g_free(name);

    /* Wait for initialization to complete */
    qemu_mutex_lock(&iothread->init_done_lock);
    while (iothread->thread_id == -1) {
        qemu_cond_wait(&iothread->init_done_cond,
                       &iothread->init_done_lock);
    }
    qemu_mutex_unlock(&iothread->init_done_lock);
}

static void iothread_get_poll_param(Object *obj, Visitor *v,
        const char *name, void *opaque, Error **errp)
{
    IOThread *iothread = IOTHREAD(obj);
    PollParamInfo *info = opaque;
    int64_t *field = (void *)iothread + info->offset;

    visit_type_int64(v, name, field, errp);
}



/*初始化iothread,调用iothread_complete*/
static void iothread_class_init(ObjectClass *klass, void *class_data)
{
    UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
    ucc->complete = iothread_complete;

    object_class_property_add(klass, "poll-max-ns", "int",
                              iothread_get_poll_param,
                              iothread_set_poll_param,
                              NULL, &poll_max_ns_info, &error_abort);
    object_class_property_add(klass, "poll-grow", "int",
                              iothread_get_poll_param,
                              iothread_set_poll_param,
                              NULL, &poll_grow_info, &error_abort);
    object_class_property_add(klass, "poll-shrink", "int",
                              iothread_get_poll_param,
                              iothread_set_poll_param,
                              NULL, &poll_shrink_info, &error_abort);
}

/*定义iothread结构*/
static const TypeInfo iothread_info = {
    .name = TYPE_IOTHREAD,
    .parent = TYPE_OBJECT,
    .class_init = iothread_class_init,
    .instance_size = sizeof(IOThread),
    .instance_init = iothread_instance_init,
    .instance_finalize = iothread_instance_finalize,
    .interfaces = (InterfaceInfo[]) {
        {TYPE_USER_CREATABLE},
        {}
    },
};

static void iothread_register_types(void)
{
    type_register_static(&iothread_info);
}

type_init(iothread_register_types)

char *iothread_get_id(IOThread *iothread)
{
    return object_get_canonical_path_component(OBJECT(iothread));
}

AioContext *iothread_get_aio_context(IOThread *iothread)
{
    return iothread->ctx;
}



void iothread_stop_all(void)
{
    Object *container = object_get_objects_root();
    BlockDriverState *bs;
    BdrvNextIterator it;

    for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
        AioContext *ctx = bdrv_get_aio_context(bs);
        if (ctx == qemu_get_aio_context()) {
            continue;
        }
        aio_context_acquire(ctx);
        bdrv_set_aio_context(bs, qemu_get_aio_context());
        aio_context_release(ctx);
    }

    object_child_foreach(container, iothread_stop_iter, NULL);
}

IOThread *iothread_create(const char *id, Error **errp)
{
    Object *obj;

    obj = object_new_with_props(TYPE_IOTHREAD,
                                object_get_internal_root(),
                                id, errp, NULL);

    return IOTHREAD(obj);
}

void iothread_destroy(IOThread *iothread)
{
    object_unparent(OBJECT(iothread));
}

异步I/O机制

linux下有同步与异步I/O两种机制。同步I/O机制在打开Direct I/O方式后,通过调用类似read()/pread()等系统调用来直接同步发起I/O操作,在请求返回用户缓存前都会被阻塞着。而异步I/O(AIO),可以通过向内核发出I/O请求命令,以非阻塞的方式等待I/O操作完成,内核会通过函数回调或者信号机制通知用户进程。这样很大程度提高了系统吞吐量。

aio示意图.png
aio示意图.png

同步I/O需要不断的通过select去轮询查数据,高频率的进行用户态和内核态切换,在大并发不固定频率io环境中很低效,会导致调用线程经常阻塞。为了提高吞吐量,必须开大量线程,因此造成线程切换的巨大开销。所以一般高性能场景下都避免使用大量线程,一般一个虚拟cpu一个线程,尽量避免线程切换,而异步有通知机制,真正的io操作交给操作系统/其他线程执行,主线程可以直接顺延运行。

qemu调用epoll机制轮询利用eventfd时

代码语言:javascript
复制
//轮询
static int aio_epoll(AioContext *ctx, GPollFD *pfds,
                     unsigned npfd, int64_t timeout)
{
    AioHandler *node;
    int i, ret = 0;
    struct epoll_event events[128];

    assert(npfd == 1);
    assert(pfds[0].fd == ctx->epollfd);
    if (timeout > 0) {
        ret = qemu_poll_ns(pfds, npfd, timeout);
    }
    //epoll_wait主要是等待新的event,没有新的event才会等待
    if (timeout <= 0 || ret > 0) {
        ret = epoll_wait(ctx->epollfd, events,
                         ARRAY_SIZE(events),
                         timeout);
        if (ret <= 0) {
            goto out;
        }
        for (i = 0; i < ret; i++) {
            int ev = events[i].events;
            node = events[i].data.ptr;
            node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
                (ev & EPOLLOUT ? G_IO_OUT : 0) |
                (ev & EPOLLHUP ? G_IO_HUP : 0) |
                (ev & EPOLLERR ? G_IO_ERR : 0);
        }
    }
out:
    return ret;
}

linux下有两种实现异步I/O的方式,一种是由 glibc 实现的 aio 系列,通过线程+阻塞调用在用户空间模拟 AIO 的功能,不需要内核的支持。另一种是采用原生的 Linux AIO,并由 libaio来封装调用接口,相比来说更底层。由于通过线程模拟异步的方式性能表现较差,因此这里只简单介绍一下后一种的AIO实现方式,其基本原理允许进程发起很多 I/O 操作,而不用阻塞或等待任何操作完成。

Linux AIO将io请求存储于内核中的一个队列中,然后根据不同的磁盘调度来响应请求。其主要的三个阶段

(1)libaio 的初始化;

(2) IO 请求的下发和回收

(3) libaio 销毁

其中最重要的五个个主要 API 函数以及宏定义可以参见libaio

代码语言:c
复制
int io_setup(int maxevents, io_context_t *ctxp);
int io_destroy(io_context_t ctx);
int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);
int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt);
int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);
void io_set_callback(struct iocb *iocb, io_callback_t cb);
void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
void io_prep_pwritev(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);
void io_prep_preadv(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);

io_context_t句柄在内核中对应一个struct kioctx结构,用来给一组异步IO请求提供一个上下文。其主要包含以下字段:

代码语言:javascript
复制
struct kioctx {
struct mm_struct*     mm;            
unsigned long         user_id;       
struct hlist_node     list;          
wait_queue_head_t     wait;          
int                   reqs_active;   
struct list_head      active_reqs;   
unsigned              max_reqs;      
struct list_head      run_list;      
struct delayed_work   wq;            
struct aio_ring_info  ring_info;
}

其中aio_ring_info结构用于存放请求结果io_event结构的ring buffer。它主要包含了如下字段:

代码语言:javascript
复制
unsigned long   mmap_base;      
unsigned long   mmap_size;      
struct page**   ring_pages;     
long            nr_pages;       
unsigned        nr, tail;  

读写前通过 io_prep_pwrite() 和 io_prep_pread() 生成 struct iocb作为 io_submit() 参数来提交异步I/O请求,其中使用io_context_t句柄在内核中对应的struct kioctx结构中aio_ring_info结构,来存放请求结果io_event结构的ring buffer,之后使用 io_getevents() 等待 IO 的结束信号,返回 events[] 数组。

代码语言:javascript
复制
int io_getevents_0_4(io_context_t ctx, long min_nr, long nr, struct io_event * events, struct timespec * timeout){
    struct aio_ring *ring;
    ring = (struct aio_ring*)ctx;
    if (ring==NULL || ring->magic != AIO_RING_MAGIC)
        goto do_syscall;
    if (timeout!=NULL && timeout->tv_sec == 0 && timeout->tv_nsec == 0) {
        if (ring->head == ring->tail)
            return 0;
    }
do_syscall:
    return __io_getevents_0_4(ctx, min_nr, nr, events, timeout);
}

整个异步I/O请求的流程如下图所示:

linux aio流转.png
linux aio流转.png

这里需要注意的是linux下的原生态的异步IO也带有aio线程,但它并不是异步处理线程,它只进行处理请求重试逻辑。

native属性优化性能

native.png
native.png

IO:可选io属性控制I / O上的特定策略,qemu支持“threads”和“native"模式。

IO线程是一种专门的事件循环线程,用于提高磁盘Block I/O的scalability(可扩展性),这些线程会分配给支持的磁盘设备。每个物理CPU只有1-2个IO线程,每个IO线程也可能分配给多个磁盘设备。libvirt配置中默认磁盘设备的AIO thread模式,配合I/O事件处理线程的native属性相较于原先的默认threads模式区别在于底层的I/O提交方式有所差异。设置native,QEMU会调用异步io_submit来提交IO;设置为threads,QEMU会最终调用pread/pwrite同步提交IO,pread/pwrite其最大的好处是它们不会改变文件当前的偏移量,对pread()和pwrite()函数而言 ,fd 所指代的文件必须是可定为的(即允许对文件描述符执行了lseek()),多线程应用为这些系统调用提供了用武之地。同时使用pread()和pwrite()系统调用能够避免进程间出现竞争状态,但其本质提交I/O的方式由于还是同步提交,在存在多盘多线程的情况下,依旧会以阻塞的方式等待,浪费时间与资源。native特性开启线程后,由linux内核来管理,利用Libaio实现异步提交IO的目的,以非阻塞的方式来高效充分利用资源,同时也避免了thread特性下大量进程的出现。

结合iothread+native特性,可以使得虚拟机的I/O读写性能在一定程度上提高。避免了多块磁盘设备下,磁盘由于io处理线程大家公用,且使用的是pread/pwrite这种同步提交I/O的方式,导致底层阻塞,造成其他硬盘io飙高甚至不可用的情况。

测试环境:

统一在单虚拟机8c32GB无文件系统测试下。虚拟化方式:kvm(内核版本3.10.0-693)+qemu(2.6.0)

测试工具

FIO、dd

磁盘类型:

SSD-SATA、HDD

result.png
result.png

测试结论:测试指标看,总体上iothread-native性能优于之前的主线程+aiothread模式,有差异的值差异并不大。

由此可见,不开启iothread特性下的qemu流程是在主线程循环中处理I/O事件,这样会导致主线程被多个子机,多个磁盘共用,导致拥塞。默认thread属性下,由于使用的是同步的I/O处理方式,在提交I处理/O请求时,由虚拟机向宿主机内核传达I/O,再到设备层的读取,会导致底层拥塞,多个设备硬盘之间io飙高甚至不可用。而开启了iothread后的qemu,对I/O事件进行了专用事件线程分配,为每一个磁盘都可分配单独的线程处理,这样就避免了共用主线程处理的弊端,多线程的处理流程也可以加速io处理。并且开启了native属性后,相对于threads单纯的开启多线程提交处理I/O,由于是同步的方式,底层一样会等待每一个IO请求返回,这个过程中,线程并做不了其余IO提交,这样就形成了线性流水线拥塞的情况,每一个IO请求得必须返回后才能进行下一个,而异步的方式则有效的避免了这种方式,通过event事件的保存回调,可以在一个请求还未返回时继续下一个IO请求提交。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • Libvirt下专用I/O线程配置
  • iothread架构是什么
    • 定义进程时
    • 异步I/O机制
      • qemu调用epoll机制轮询利用eventfd时
      • native属性优化性能
      相关产品与服务
      云服务器
      云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档