Commit 7c9b2bf6 authored by Paolo Bonzini's avatar Paolo Bonzini Committed by Stefan Weil
Browse files

qemu-thread: add a fast path to the Win32 QemuEvent



QemuEvents are used heavily by call_rcu.  We do not want them to be slow,
but the current implementation does a kernel call on every invocation
of qemu_event_* and won't cut it.

So, wrap a Win32 manual-reset event with a fast userspace path.  The
states and transitions are the same as for the futex and mutex/condvar
implementations, but the slow path is different of course.  The idea
is to reset the Win32 event lazily, as part of a test-reset-test-wait
sequence.  Such a sequence is, indeed, how QemuEvents are used by
RCU and other subsystems!

The patch includes a formal model of the algorithm.

Tested-by: default avatarStefan Weil <sw@weilnetz.de>
Signed-off-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
Signed-off-by: default avatarStefan Weil <sw@weilnetz.de>
parent a246a016
Loading
Loading
Loading
Loading
+98 −0
Original line number Diff line number Diff line
/*
 * This model describes the implementation of QemuEvent in
 * util/qemu-thread-win32.c.
 *
 * Author: Paolo Bonzini <pbonzini@redhat.com>
 *
 * This file is in the public domain.  If you really want a license,
 * the WTFPL will do.
 *
 * To verify it:
 *     spin -a docs/event.promela
 *     gcc -O2 pan.c -DSAFETY
 *     ./a.out
 */

bool event;
int value;

/* Primitives for a Win32 event */
#define RAW_RESET event = false
#define RAW_SET   event = true
#define RAW_WAIT  do :: event -> break; od

#if 0
/* Basic sanity checking: test the Win32 event primitives */
#define RESET     RAW_RESET
#define SET       RAW_SET
#define WAIT      RAW_WAIT
#else
/* Full model: layer a userspace-only fast path on top of the RAW_*
 * primitives.  SET/RESET/WAIT have exactly the same semantics as
 * RAW_SET/RAW_RESET/RAW_WAIT, but try to avoid invoking them.
 */
#define EV_SET 0
#define EV_FREE 1
#define EV_BUSY -1

int state = EV_FREE;

int xchg_result;
#define SET   if :: state != EV_SET ->                                      \
                    atomic { /* xchg_result=xchg(state, EV_SET) */          \
                        xchg_result = state;                                \
                        state = EV_SET;                                     \
                    }                                                       \
                    if :: xchg_result == EV_BUSY -> RAW_SET;                \
                       :: else -> skip;                                     \
                    fi;                                                     \
                 :: else -> skip;                                           \
              fi

#define RESET if :: state == EV_SET -> atomic { state = state | EV_FREE; }  \
                 :: else            -> skip;                                \
              fi

int tmp1, tmp2;
#define WAIT  tmp1 = state;                                                 \
              if :: tmp1 != EV_SET ->                                       \
                    if :: tmp1 == EV_FREE ->                                \
                          RAW_RESET;                                        \
                          atomic { /* tmp2=cas(state, EV_FREE, EV_BUSY) */  \
                              tmp2 = state;                                 \
                              if :: tmp2 == EV_FREE -> state = EV_BUSY;     \
                                 :: else            -> skip;                \
                              fi;                                           \
                          }                                                 \
                          if :: tmp2 == EV_SET -> tmp1 = EV_SET;            \
                             :: else           -> tmp1 = EV_BUSY;           \
                          fi;                                               \
                       :: else -> skip;                                     \
                    fi;                                                     \
                    assert(tmp1 != EV_FREE);                                \
                    if :: tmp1 == EV_BUSY -> RAW_WAIT;                      \
                       :: else -> skip;                                     \
                    fi;                                                     \
                 :: else -> skip;                                           \
              fi
#endif

active proctype waiter()
{
     if
         :: !value ->
             RESET;
             if
                 :: !value -> WAIT;
                 :: else   -> skip;
             fi;
        :: else -> skip;
    fi;
    assert(value);
}

active proctype notifier()
{
    value = true;
    SET;
}
+1 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ struct QemuSemaphore {
};

struct QemuEvent {
    int value;
    HANDLE event;
};

+62 −4
Original line number Diff line number Diff line
@@ -238,10 +238,34 @@ void qemu_sem_wait(QemuSemaphore *sem)
    }
}

/* Wrap a Win32 manual-reset event with a fast userspace path.  The idea
 * is to reset the Win32 event lazily, as part of a test-reset-test-wait
 * sequence.  Such a sequence is, indeed, how QemuEvents are used by
 * RCU and other subsystems!
 *
 * Valid transitions:
 * - free->set, when setting the event
 * - busy->set, when setting the event, followed by futex_wake
 * - set->free, when resetting the event
 * - free->busy, when waiting
 *
 * set->busy does not happen (it can be observed from the outside but
 * it really is set->free->busy).
 *
 * busy->free provably cannot happen; to enforce it, the set->free transition
 * is done with an OR, which becomes a no-op if the event has concurrently
 * transitioned to free or busy (and is faster than cmpxchg).
 */

#define EV_SET         0
#define EV_FREE        1
#define EV_BUSY       -1

void qemu_event_init(QemuEvent *ev, bool init)
{
    /* Manual reset.  */
    ev->event = CreateEvent(NULL, TRUE, init, NULL);
    ev->event = CreateEvent(NULL, TRUE, TRUE, NULL);
    ev->value = (init ? EV_SET : EV_FREE);
}

void qemu_event_destroy(QemuEvent *ev)
@@ -251,18 +275,52 @@ void qemu_event_destroy(QemuEvent *ev)

void qemu_event_set(QemuEvent *ev)
{
    if (atomic_mb_read(&ev->value) != EV_SET) {
        if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
            /* There were waiters, wake them up.  */
            SetEvent(ev->event);
        }
    }
}

void qemu_event_reset(QemuEvent *ev)
{
    ResetEvent(ev->event);
    if (atomic_mb_read(&ev->value) == EV_SET) {
        /* If there was a concurrent reset (or even reset+wait),
         * do nothing.  Otherwise change EV_SET->EV_FREE.
         */
        atomic_or(&ev->value, EV_FREE);
    }
}

void qemu_event_wait(QemuEvent *ev)
{
    unsigned value;

    value = atomic_mb_read(&ev->value);
    if (value != EV_SET) {
        if (value == EV_FREE) {
            /* qemu_event_set is not yet going to call SetEvent, but we are
             * going to do another check for EV_SET below when setting EV_BUSY.
             * At that point it is safe to call WaitForSingleObject.
             */
            ResetEvent(ev->event);

            /* Tell qemu_event_set that there are waiters.  No need to retry
             * because there cannot be a concurent busy->free transition.
             * After the CAS, the event will be either set or busy.
             */
            if (atomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
                value = EV_SET;
            } else {
                value = EV_BUSY;
            }
        }
        if (value == EV_BUSY) {
            WaitForSingleObject(ev->event, INFINITE);
        }
    }
}

struct QemuThreadData {
    /* Passed to win32_start_routine.  */