本文发自 http://www.binss.me/blog/qemu-note-of-coroutine/,转载请注明出处。

定义

协程是什么?在给其下定义之前,我们先来看看它的“竞争对手”进程和线程是如何在不同任务间进行切换的。这种切换被定义为上下文切换(context switch),进程和线程在做这件事的时候总是要进到内核中,然后内核啪啪啪地就帮我们把当前进程/线程的寄存器保存起来,最后加载下一个进程/线程的寄存器,开始运行。

仔细想想,既然只是状态的保存,为何一定要进到内核态,我能否把保存的代码挪到用户态,从而节省切换到内核态的开销嘛?这种思想的产物就是协程,因此协程又被称为用户态线程。

协程在思想上的亮点是让协同式调度重出江湖。进程和线程都是抢占式调度,一旦时间片耗尽,即使你还有一行代码就执行完了,不好意思,轮到下一个进程/线程执行了,于是当前进行/线程只能含恨而退。而对于协程来说,不存在抢占的说法,一个协程一旦得到执行,那么它能够一直执行下去,直到其通过yield主动释放执行权为止。因此协程的调度依赖于协程之间的相互协作。

QEMU 在主线程使用了一个事件循环,当有事件发生时,进行相应处理,然后进入下一轮循环的等待。对于需要长时间阻塞的任务(如IO),我们往往采用异步的方式进行处理:为其设置一个回调函数后直接返回,当任务完成时调用回调函数进行相应处理。但实践中证明,回调并不是那么好用,每一次执行耗时IO时总是需要为其设置一个回调函数,并传递相应的上下文状态。如果有多个这样的操作,则代码会被割裂得支离破碎,变得非常恶心。

为了解决这个问题,QEMU 中把所有的 block I/O 函数都做成了协程。开发者可以很方便地在执行耗时操作时 yield 出去,然后执行完操作后从该位置继续往下执行,保证了逻辑上的连续性。

在 C/C++ 语言下,协程如果不使用某些hack手段的话,一般是通过 glibc 的 ucontext 组件 或 libc 的 setjump 组件来实现。

基础

sigsetjmp / siglongjmp

定义了两个API,利用它们可以实现上下文切换。

  • int sigsetjmp(sigjmp_buf env, int savesigs)

    获取当前的上下文,存到 env 中。

    如果savesigs非零,则同时会将进程当前的信号屏蔽字(mask)保存到 env 中

    主动调用时,返回0。在 siglongjmp 后会切换到调用该函数的上下文,这时返回值为 siglongjmp 的 val 参数

  • void siglongjmp(sigjmp_buf env, int val)

    设置当前的上下文为 env ,即恢复到调用 sigsetjmp 时的上下文。

例子

#include <setjmp.h>
#include <stdio.h>
#include <setjmp.h>
#include <signal.h>

static sigjmp_buf jmpbuf;

void sig_fpe(int signo)
{
   siglongjmp(jmpbuf, 1);
}

int main(int argc, char *argv[])
{
    signal(SIGFPE, sig_fpe);
    if (sigsetjmp(jmpbuf, 1) == 0) {
        int ret = 10 / 0;
    }
    else {
        printf("catch exception\n");
    }
}

这段代码中 sigsetjmp + siglongjmp 的作用相当于 try...catch...

思考

用户级的切换,意味着无须陷入到内核态,性能大大的好,然而 sigsetjmp / siglongjmp 并不是为了协程而设计的,如果单纯用它来实现协程,会出事。

在此提一个问题:上下文切换真的只有切换寄存器吗?

不,还有栈呢。如果不把原进程的栈保存起来,那么新进程在执行过程中可能会破坏掉原进程的栈,从而导致意想不到的结果。这时候就需要用到 ucontext 了。

ucontext

定义了四个API,利用它们同样可以实现上下文切换。

  • int getcontext(ucontext_t *ucp)

    获取当前的上下文,存到参数 ucp 中

  • int setcontext(const ucontext_t *ucp)

    设置当前的上下文为ucp。ucp需要通过 getcontext 来得到

  • void makecontext(ucontext_t ucp, void (func)(), int argc, ...)

    修改上下文ucp,即在ucp上下文中调用函数func,参数个数为argc,参数为后继参数,要求类型为int。

    但在此之前,需要为通过 getcontext 得到的ucp设置 栈空间和后继上下文。栈空间用于对func进行调用,后继上下文用于func执行后切换。

  • int swapcontext(ucontext_t oucp, ucontext_t ucp)

    保存当前上下文到oucp,然后设置当前上下文为ucp

例子

#include <ucontext.h>
#include <stdio.h>

void func(void *arg)
{
    printf("child\n");
}

void main()
{
    char stack[1024*128];
    ucontext_t child, main;

    getcontext(&child);                             // 获取当前上下文
    child.uc_stack.ss_sp = stack;                   // 设置栈空间
    child.uc_stack.ss_size = sizeof(stack);         // 设置栈空间大小
    child.uc_stack.ss_flags = 0;
    child.uc_link = &main;                          // 设置后继上下文

    makecontext(&child, (void (*)(void))func, 0);   // 设置在child上下文中执行的函数

    swapcontext(&main, &child);                     // 切换到child上下文,执行func。将当前上下文保存到main
    // 由于设置了child的后继上下文为main,func执行完后会返回到此处
    printf("main\n");
}

如果后继上下文设置为 NULL ,则child执行完毕后程序结束。

思考

ucontext 好是好了,但是需要进入到内核态,性能不咋地。

性能好的功能不全,功能全的性能不好,怎么办?当然是把两者结合起来啦,当需要保存栈时,使用 ucontext ,不需要时,用回 sigsetjmp / siglongjmp 。

QEMU 协程实现

QEMU 就是这样做的~

数据结构

struct Coroutine {
    CoroutineEntry *entry;                      // 函数指针
    void *entry_arg;                            // 函数参数
    Coroutine *caller;                          // 上级协程
    QSLIST_ENTRY(Coroutine) pool_next;          // 用于加入到协程池链表
    size_t locks_held;

    /* Coroutines that should be woken up when we yield or terminate */
    QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup; // 协程唤醒队列
    QSIMPLEQ_ENTRY(Coroutine) co_queue_next;    // 该协程退出时需要唤醒的协程
};

Coroutine 封装了协程。

typedef struct {
    Coroutine base;                                 // 协程
    void *stack;                                    // 当前上下文的进程栈
    size_t stack_size;
    sigjmp_buf env;

#ifdef CONFIG_VALGRIND_H
    unsigned int valgrind_stack_id;
#endif

} CoroutineUContext;

CoroutineUContext 封装了协程上下文,是对coroutine的进一步包装。

创建协程

qemu_coroutine_create 负责创建协程,如果协程池里有可供使用的协程对象,则进行复用:

Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
{
    Coroutine *co = NULL;
    // 如果有协程池,从全局变量中取出第一个
    if (CONFIG_COROUTINE_POOL) {
        co = QSLIST_FIRST(&alloc_pool);
        // 如果不存在,且 release pool 足够大,将 release pool 中的协程拷到 alloc pool
        if (!co) {
            if (release_pool_size > POOL_BATCH_SIZE) {
                /* Slow path; a good place to register the destructor, too.  */
                if (!coroutine_pool_cleanup_notifier.notify) {
                    coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup;
                    qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier);
                }

                /* This is not exact; there could be a little skew between
                 * release_pool_size and the actual size of release_pool.  But
                 * it is just a heuristic, it does not need to be perfect.
                 */
                alloc_pool_size = atomic_xchg(&release_pool_size, 0);
                QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool);
                // 取出第一个进行复用
                co = QSLIST_FIRST(&alloc_pool);
            }
        }
        // 如果存在,将其从alloc pool中取出,池中协程数目减1
        if (co) {
            QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
            alloc_pool_size--;
        }
    }
    // 如果没取到协程对象,则需要创建一个
    if (!co) {
        co = qemu_coroutine_new();
    }

    // 设置协程要执行的函数和函数参数
    co->entry = entry;
    co->entry_arg = opaque;
    // 初始化协程线性队列
    QSIMPLEQ_INIT(&co->co_queue_wakeup);
    return co;
}

如果没有池或池里没有,会调用 qemu_coroutine_new 创建新协程对象:

Coroutine *qemu_coroutine_new(void)
{
    CoroutineUContext *co;
    ucontext_t old_uc, uc;
    sigjmp_buf old_env;
    union cc_arg arg = {0};

    /* The ucontext functions preserve signal masks which incurs a
     * system call overhead.  sigsetjmp(buf, 0)/siglongjmp() does not
     * preserve signal masks but only works on the current stack.
     * Since we need a way to create and switch to a new stack, use
     * the ucontext functions for that but sigsetjmp()/siglongjmp() for
     * everything else.
     */
    // 保存当前上下文
    if (getcontext(&uc) == -1) {
        abort();
    }

    // 为协程上下文结构分配内存,设置栈大小(1MB)并创建和设置栈
    co = g_malloc0(sizeof(*co));
    co->stack_size = COROUTINE_STACK_SIZE;
    co->stack = qemu_alloc_stack(&co->stack_size);
    // 指向 sigsetjmp 时保存的上下文
    co->base.entry_arg = &old_env; /* stash away our jmp_buf */

    // 为当前上下文设置后继上下文
    uc.uc_link = &old_uc;
    // 设置栈
    uc.uc_stack.ss_sp = co->stack;
    uc.uc_stack.ss_size = co->stack_size;
    uc.uc_stack.ss_flags = 0;

#ifdef CONFIG_VALGRIND_H
    co->valgrind_stack_id =
        VALGRIND_STACK_REGISTER(co->stack, co->stack + co->stack_size);
#endif

    arg.p = co;

    // 设置uc要执行的函数
    // 由于只能传递类型为int的参数,因此把指向协程上下文的指针拆成两个int来传
    makecontext(&uc, (void (*)(void))coroutine_trampoline,
                2, arg.i[0], arg.i[1]);

    /* swapcontext() in, siglongjmp() back out */
    // 保存返回上下文到 co->base.entry_arg 指向的地址
    // 如果是第一次执行,sigsetjmp 返回值为0,则执行 swapcontext 切换到uc,即调用 coroutine_trampoline
    // coroutine_trampoline 在第一次调用时会通过 siglongjmp 返回到这里,此时 sigsetjmp 返回值为1,不执行 swapcontext
    if (!sigsetjmp(old_env, 0)) {
        swapcontext(&old_uc, &uc);
    }

    // 返回协程对象
    return &co->base;
}

从此可以看出 QEMU 的协程用到了 ucontext 和 siglongjmp 。ucontext 和 sigsetjmp 这两套单出拉出来都能实现协程,为什么QEMU中要把它们结合起来,同时使用两套呢?

ucontext 使用起来非常方便,在分配并设置好栈后只需调用一个函数即可保存所有上下文到新的栈中,包括 signal mask 。但它会导致系统调用,开销较大。

而 sigsetjmp(buf, 0) 可以不保存signal mask,开销较小,但只能在当前栈上jmp来jmp去。

因此在需要切换栈时使用 ucontext ,在其他情况下使用 sigsetjmp 。

这里由于需要调用函数 coroutine_trampoline ,需要切换栈,于是使用 ucontext ,通过 swapcontext 进入到 coroutine_trampoline 中:

static void coroutine_trampoline(int i0, int i1)
{
    union cc_arg arg;
    CoroutineUContext *self;
    Coroutine *co;

    arg.i[0] = i0;
    arg.i[1] = i1;
    // 将两个int参数拼成协程上下文对象的指针
    self = arg.p;
    co = &self->base;

    /* Initialize longjmp environment and switch back the caller */
    // 保存上下文,如果是第一次执行,返回值为0,切换回 qemu_coroutine_new 中保存的上下文
    if (!sigsetjmp(self->env, 0)) {
        siglongjmp(*(sigjmp_buf *)co->entry_arg, 1);
    }

    while (true) {
        // 执行传入的协程函数
        co->entry(co->entry_arg);
        // 执行完毕后切换到协程调用者的上下文(caller)
        qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
    }
}

在通过 qemu_coroutine_new => makecontext 第一次执行 coroutine_trampoline 时,通过 siglongjmp(co->entry_arg) 跳转回 qemu_coroutine_new ,于是返回。

之后通过 qemu_coroutine_enter 重新进入 coroutine_trampoline 时,进入while循环,执行设置的函数,完成后切换到协程调用的上下文。

这符合我们对协程的一般认识——创建后需要 next 一下才能跑起来。

进入协程

在 qemu_coroutine_create 创建/复用 了协程对象,设置了协程要执行的函数和函数参数后,caller 需要通过 qemu_coroutine_enter 让协程跑起来:

void qemu_coroutine_enter(Coroutine *co)
{
    // 获取当前正在执行的协程,对于 ucontext 实现存在全局上下文变量 leader 的base中
    Coroutine *self = qemu_coroutine_self();
    CoroutineAction ret;

    trace_qemu_coroutine_enter(self, co, co->entry_arg);

    // 如果存在caller,表示要执行的协程处于协程内。由于QEMU不允许在协程内再运行协程,因此报错退出
    if (co->caller) {
        fprintf(stderr, "Co-routine re-entered recursively\n");
        abort();
    }
    // 设置要运行协程的caller为当前协程
    co->caller = self;
    // 切换
    ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);

    qemu_co_queue_run_restart(co);

    switch (ret) {
    case COROUTINE_YIELD:
        return;
    case COROUTINE_TERMINATE:
        assert(!co->locks_held);
        trace_qemu_coroutine_terminate(co);
        coroutine_delete(co);
        return;
    default:
        abort();
    }
}

可以发现除了出现在 coroutine_trampoline 中,qemu_coroutine_switch 还出现在了 qemu_coroutine_enter 中,用于切换上下文:

CoroutineAction __attribute__((noinline))
qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
                      CoroutineAction action)
{
    CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_);
    CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_);
    int ret;

    current = to_;

    ret = sigsetjmp(from->env, 0);
    if (ret == 0) {
        siglongjmp(to->env, action);
    }
    return ret;
}

qemu_coroutine_switch 本质上是 siglongjmp(co->env) ,跳转到对应协程的上下文保存点。但在此之前,会保存当前协程的上下文。

也就是说,在调用 qemu_coroutine_enter => qemu_coroutine_switch 时,会通过 sigsetjmp 跳转到第一次调用 coroutine_trampoline 的 sigsetjmp 的位置,由于 action 不为0,于是 coroutine_trampoline 中 sigsetjmp 返回值不为0,不执行 if 内的 siglongjmp ,于是继续往下执行进入 while 循环开始执行预先设置的协程函数。

如果协程函数执行完毕返回,会调用 qemu_coroutine_switch(COROUTINE_TERMINATE) ,由于父级协程在上次 qemu_coroutine_switch 时调用 sigsetjmp(from->env, 0) ,此时 from 成为 to,于是 siglongjmp(to->env) 回到上次 qemu_coroutine_switch 的 sigsetjmp 处。由于 action 不为0,则上次 qemu_coroutine_switch 的 ret 不为0,于是 qemu_coroutine_switch 返回。

yield

当协程未执行结束(COROUTINE_TERMINATE)而又需要返回时,需要进行 yield 。虽然在许多编程语言中这种行为被定义成像 return 一样的关键字(如Python),但在C中需要通过函数实现。

void coroutine_fn qemu_coroutine_yield(void)
{
    Coroutine *self = qemu_coroutine_self();
    Coroutine *to = self->caller;

    trace_qemu_coroutine_yield(self, to);

    if (!to) {
        fprintf(stderr, "Co-routine is yielding to no one\n");
        abort();
    }

    self->caller = NULL;
    qemu_coroutine_switch(self, to, COROUTINE_YIELD);
}

yield 本质上是要从当前协程切换回上一级,因此同样可以通过调用 qemu_coroutine_switch 实现,只是参数换成 COROUTINE_YIELD 而已。于是协程停止执行,但其又没有被销毁,下次又可以通过 qemu_coroutine_enter 重新进到协程函数中上次 yield 的地方。

销毁协程

qemu_coroutine_enter 会根据 qemu_coroutine_switch 的返回值判断协程是 yield 还是 terminate。如果是 terminate ,则调用 coroutine_delete 进行销毁。

static void coroutine_delete(Coroutine *co)
{
    co->caller = NULL;

    if (CONFIG_COROUTINE_POOL) {
        if (release_pool_size < POOL_BATCH_SIZE * 2) {
            QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
            atomic_inc(&release_pool_size);
            return;
        }
        if (alloc_pool_size < POOL_BATCH_SIZE) {
            QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next);
            alloc_pool_size++;
            return;
        }
    }

    qemu_coroutine_delete(co);
}

在开启了协程池的情况下,如果release pool中的协程数目不够多,则将其加入到release pool中,否则检查alloc pool中的协程数目,如果不够多,则将其加入到alloc pool。如果以上两个pool都满了,则像不支持协程池那样调用 qemu_coroutine_delete 析构协程对象:

void qemu_coroutine_delete(Coroutine *co_)
{
    CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_);

#ifdef CONFIG_VALGRIND_H
    valgrind_stack_deregister(co);
#endif
    // 释放栈空间
    qemu_free_stack(co->stack, co->stack_size);
    // 释放协程上下文的空间
    g_free(co);
}

其他

除了create / enter / yield 这些基础功能外,QEMU 还提供了对它们的包装来进一步方便使用。

co_aio_sleep_ns

yield 并在超出设定时间 保持睡眠。实际上是创建了一个 CoSleepCB 然后调用 qemu_coroutine_yield 。当 CoSleepCB 中的 QEMUTimer 超时时,会调用回调函数 co_sleep_cb ,最终会调到 qemu_coroutine_enter 让协程重新跑起来。

CoQueue

协程队列。

  • qemu_co_queue_init 初始化协程队列。

  • qemu_co_queue_wait 将当前的协程加入到协程队列中,然后 yield

  • qemu_co_queue_do_restart 对co->co_queue_wakeup 中的协程依次调用 qemu_coroutine_enter ,直到队列为空为止。

    在 qemu_coroutine_enter 进入目标协程之前总会调用该函数 。也就是说,在执行新协程之前,始终会把协程队列中的协程先执行一遍。

  • qemu_co_enter_next 从协程队列中取出第一个协程,对其调用 qemu_coroutine_enter 让它跑起来

  • qemu_co_queue_empty 清空协程队列

CoMutex

即使协程应该是相互协作的,但在某些情况下依然需要锁来避免相互影响。

  • qemu_co_mutex_init => qemu_co_queue_init 初始化

  • qemu_co_mutex_lock 尝试加锁。如果锁已被持有,则调用 qemu_co_queue_wait 进行等待,本质上是将当前协程加入到协程队列后调用 qemu_coroutine_yield

  • qemu_co_mutex_unlock 释放锁,然后调用 qemu_co_queue_next 从(处于等待状态的)协程队列中取出第一个协程执行

典型应用

QEMU在迁移VM时,接收方会监听端口,接受发送方的连接:

socket_accept_incoming_migration => migration_channel_process_incoming
=> qemu_fopen_channel_input => qemu_fopen_ops(ioc, &channel_input_ops)   创建 QEMUFile
=> migration_fd_process_incoming => qemu_coroutine_create(process_incoming_migration_co, f) 创建协程
                                 => migrate_decompress_threads_create    创建解压线程decompress
                                 => qemu_file_set_blocking               将对channel设置为非阻塞
                                 => qemu_coroutine_enter                 启动协程

为了处理请求,将函数 process_incoming_migration_co 包装为协程,然后调用 qemu_coroutine_enter 进入协程, process_incoming_migration_co 中有这么一段:

    ...
    if (!ret && migration_incoming_enable_colo()) {
        mis->migration_incoming_co = qemu_coroutine_self();
        qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
             colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
        mis->have_colo_incoming_thread = true;
        qemu_coroutine_yield();

        /* Wait checkpoint incoming thread exit before free resource */
        qemu_thread_join(&mis->colo_incoming_thread);
    }

如果开启了 colo ,则创建一个线程,用于等待对方的连接。但此时 process_incoming_migration_co 需要等待该流程的结束,如果在这里傻等,那 COLO incoming 线程不如不开。因此这里通过 qemu_coroutine_yield 返回。

在 colo_do_failover => secondary_vm_do_failover => qemu_coroutine_enter(mis->migration_incoming_co) 中会重新回到这里。

总结

利用协程来写业务代码非常爽,但 debug 起来比回调函数更加痛苦,一旦发生上下文切换,gdb就懵逼了,你会发现刷一声协程就执行完毕返回了,step 什么的根本不起作用。就算你在协程函数打断点停下来,发现调用栈中都是 ?? 。无疑,协程在保持业务代码逻辑的连续性的同时,为调试带来了许多麻烦。

参考

http://qemu.rocks/jtc-kvm2016/

http://blog.csdn.net/qq910894904/article/details/41911175

http://royluo.org/2016/06/24/qemu-coroutine/