Commit 480cff63 authored by Paolo Bonzini's avatar Paolo Bonzini Committed by Stefan Hajnoczi
Browse files

coroutine-lock: add limited spinning to CoMutex



Running a very small critical section on pthread_mutex_t and CoMutex
shows that pthread_mutex_t is much faster because it doesn't actually
go to sleep.  What happens is that the critical section is shorter
than the latency of entering the kernel and thus FUTEX_WAIT always
fails.  With CoMutex there is no such latency but you still want to
avoid wait and wakeup.  So introduce it artificially.

This only works with one waiters; because CoMutex is fair, it will
always have more waits and wakeups than a pthread_mutex_t.

Signed-off-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
Reviewed-by: default avatarFam Zheng <famz@redhat.com>
Message-id: 20170213181244.16297-3-pbonzini@redhat.com
Signed-off-by: default avatarStefan Hajnoczi <stefanha@redhat.com>
parent fed20a70
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -167,6 +167,11 @@ typedef struct CoMutex {
     */
    unsigned locked;

    /* Context that is holding the lock.  Useful to avoid spinning
     * when two coroutines on the same AioContext try to get the lock. :)
     */
    AioContext *ctx;

    /* A queue of waiters.  Elements are added atomically in front of
     * from_push.  to_pop is only populated, and popped from, by whoever
     * is in charge of the next wakeup.  This can be an unlocker or,
+45 −6
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@
#include "qemu-common.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
#include "qemu/processor.h"
#include "qemu/queue.h"
#include "block/aio.h"
#include "trace.h"
@@ -181,7 +182,18 @@ void qemu_co_mutex_init(CoMutex *mutex)
    memset(mutex, 0, sizeof(*mutex));
}

static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex)
static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
{
    /* Read co before co->ctx; pairs with smp_wmb() in
     * qemu_coroutine_enter().
     */
    smp_read_barrier_depends();
    mutex->ctx = co->ctx;
    aio_co_wake(co);
}

static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
                                                     CoMutex *mutex)
{
    Coroutine *self = qemu_coroutine_self();
    CoWaitRecord w;
@@ -206,10 +218,11 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex)
        if (co == self) {
            /* We got the lock ourselves!  */
            assert(to_wake == &w);
            mutex->ctx = ctx;
            return;
        }

        aio_co_wake(co);
        qemu_co_mutex_wake(mutex, co);
    }

    qemu_coroutine_yield();
@@ -218,13 +231,39 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex)

void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
{
    AioContext *ctx = qemu_get_current_aio_context();
    Coroutine *self = qemu_coroutine_self();
    int waiters, i;

    /* Running a very small critical section on pthread_mutex_t and CoMutex
     * shows that pthread_mutex_t is much faster because it doesn't actually
     * go to sleep.  What happens is that the critical section is shorter
     * than the latency of entering the kernel and thus FUTEX_WAIT always
     * fails.  With CoMutex there is no such latency but you still want to
     * avoid wait and wakeup.  So introduce it artificially.
     */
    i = 0;
retry_fast_path:
    waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
    if (waiters != 0) {
        while (waiters == 1 && ++i < 1000) {
            if (atomic_read(&mutex->ctx) == ctx) {
                break;
            }
            if (atomic_read(&mutex->locked) == 0) {
                goto retry_fast_path;
            }
            cpu_relax();
        }
        waiters = atomic_fetch_inc(&mutex->locked);
    }

    if (atomic_fetch_inc(&mutex->locked) == 0) {
    if (waiters == 0) {
        /* Uncontended.  */
        trace_qemu_co_mutex_lock_uncontended(mutex, self);
        mutex->ctx = ctx;
    } else {
        qemu_co_mutex_lock_slowpath(mutex);
        qemu_co_mutex_lock_slowpath(ctx, mutex);
    }
    mutex->holder = self;
    self->locks_held++;
@@ -240,6 +279,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
    assert(mutex->holder == self);
    assert(qemu_in_coroutine());

    mutex->ctx = NULL;
    mutex->holder = NULL;
    self->locks_held--;
    if (atomic_fetch_dec(&mutex->locked) == 1) {
@@ -252,8 +292,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
        unsigned our_handoff;

        if (to_wake) {
            Coroutine *co = to_wake->co;
            aio_co_wake(co);
            qemu_co_mutex_wake(mutex, to_wake->co);
            break;
        }

+1 −1
Original line number Diff line number Diff line
@@ -118,7 +118,7 @@ void qemu_coroutine_enter(Coroutine *co)
    co->ctx = qemu_get_current_aio_context();

    /* Store co->ctx before anything that stores co.  Matches
     * barrier in aio_co_wake.
     * barrier in aio_co_wake and qemu_co_mutex_wake.
     */
    smp_wmb();