Commit 0ceb849b authored by Paolo Bonzini's avatar Paolo Bonzini Committed by Kevin Wolf
Browse files

AioContext: speed up aio_notify



In many cases, the call to event_notifier_set in aio_notify is unnecessary.
In particular, if we are executing aio_dispatch, or if aio_poll is not
blocking, we know that we will soon get to the next loop iteration (if
necessary); the thread that hosts the AioContext's event loop does not
need any nudging.

The patch includes a Promela formal model that shows that this really
works and does not need any further complication such as generation
counts.  It needs a memory barrier though.

The generation counts are not needed because any change to
ctx->dispatching after the memory barrier is okay for aio_notify.
If it changes from zero to one, it is the right thing to skip
event_notifier_set.  If it changes from one to zero, the
event_notifier_set is unnecessary but harmless.

Signed-off-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
Signed-off-by: default avatarKevin Wolf <kwolf@redhat.com>
parent ef508f42
Loading
Loading
Loading
Loading
+33 −1
Original line number Diff line number Diff line
@@ -175,11 +175,38 @@ static bool aio_dispatch(AioContext *ctx)
bool aio_poll(AioContext *ctx, bool blocking)
{
    AioHandler *node;
    bool was_dispatching;
    int ret;
    bool progress;

    was_dispatching = ctx->dispatching;
    progress = false;

    /* aio_notify can avoid the expensive event_notifier_set if
     * everything (file descriptors, bottom halves, timers) will
     * be re-evaluated before the next blocking poll().  This happens
     * in two cases:
     *
     * 1) when aio_poll is called with blocking == false
     *
     * 2) when we are called after poll().  If we are called before
     *    poll(), bottom halves will not be re-evaluated and we need
     *    aio_notify() if blocking == true.
     *
     * The first aio_dispatch() only does something when AioContext is
     * running as a GSource, and in that case aio_poll is used only
     * with blocking == false, so this optimization is already quite
     * effective.  However, the code is ugly and should be restructured
     * to have a single aio_dispatch() call.  To do this, we need to
     * reorganize aio_poll into a prepare/poll/dispatch model like
     * glib's.
     *
     * If we're in a nested event loop, ctx->dispatching might be true.
     * In that case we can restore it just before returning, but we
     * have to clear it now.
     */
    aio_set_dispatching(ctx, !blocking);

    /*
     * If there are callbacks left that have been queued, we need to call them.
     * Do not call select in this case, because it is possible that the caller
@@ -190,12 +217,14 @@ bool aio_poll(AioContext *ctx, bool blocking)
        progress = true;
    }

    /* Re-evaluate condition (1) above.  */
    aio_set_dispatching(ctx, !blocking);
    if (aio_dispatch(ctx)) {
        progress = true;
    }

    if (progress && !blocking) {
        return true;
        goto out;
    }

    ctx->walking_handlers++;
@@ -234,9 +263,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
    }

    /* Run dispatch even if there were no readable fds to run timers */
    aio_set_dispatching(ctx, true);
    if (aio_dispatch(ctx)) {
        progress = true;
    }

out:
    aio_set_dispatching(ctx, was_dispatching);
    return progress;
}
+18 −1
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@
#include "block/aio.h"
#include "block/thread-pool.h"
#include "qemu/main-loop.h"
#include "qemu/atomic.h"

/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */
@@ -247,10 +248,26 @@ ThreadPool *aio_get_thread_pool(AioContext *ctx)
    return ctx->thread_pool;
}

void aio_set_dispatching(AioContext *ctx, bool dispatching)
{
    ctx->dispatching = dispatching;
    if (!dispatching) {
        /* Write ctx->dispatching before reading e.g. bh->scheduled.
         * Optimization: this is only needed when we're entering the "unsafe"
         * phase where other threads must call event_notifier_set.
         */
        smp_mb();
    }
}

void aio_notify(AioContext *ctx)
{
    /* Write e.g. bh->scheduled before reading ctx->dispatching.  */
    smp_mb();
    if (!ctx->dispatching) {
        event_notifier_set(&ctx->notifier);
    }
}

static void aio_timerlist_notify(void *opaque)
{
+104 −0
Original line number Diff line number Diff line
/*
 * This model describes the interaction between aio_set_dispatching()
 * and aio_notify().
 *
 * Author: Paolo Bonzini <pbonzini@redhat.com>
 *
 * This file is in the public domain.  If you really want a license,
 * the WTFPL will do.
 *
 * To simulate it:
 *     spin -p docs/aio_notify.promela
 *
 * To verify it:
 *     spin -a docs/aio_notify.promela
 *     gcc -O2 pan.c
 *     ./a.out -a
 */

#define MAX   4
#define LAST  (1 << (MAX - 1))
#define FINAL ((LAST << 1) - 1)

bool dispatching;
bool event;

int req, done;

active proctype waiter()
{
     int fetch, blocking;

     do
        :: done != FINAL -> {
            // Computing "blocking" is separate from execution of the
            // "bottom half"
            blocking = (req == 0);

            // This is our "bottom half"
            atomic { fetch = req; req = 0; }
            done = done | fetch;

            // Wait for a nudge from the other side
            do
                :: event == 1 -> { event = 0; break; }
                :: !blocking  -> break;
            od;

            dispatching = 1;

            // If you are simulating this model, you may want to add
            // something like this here:
            //
            //      int foo; foo++; foo++; foo++;
            //
            // This only wastes some time and makes it more likely
            // that the notifier process hits the "fast path".

            dispatching = 0;
        }
        :: else -> break;
    od
}

active proctype notifier()
{
    int next = 1;
    int sets = 0;

    do
        :: next <= LAST -> {
            // generate a request
            req = req | next;
            next = next << 1;

            // aio_notify
            if
                :: dispatching == 0 -> sets++; event = 1;
                :: else             -> skip;
            fi;

            // Test both synchronous and asynchronous delivery
            if
                :: 1 -> do
                            :: req == 0 -> break;
                        od;
                :: 1 -> skip;
            fi;
        }
        :: else -> break;
    od;
    printf("Skipped %d event_notifier_set\n", MAX - sets);
}

#define p (done == FINAL)

never  {
    do
        :: 1                      // after an arbitrarily long prefix
        :: p -> break             // p becomes true
    od;
    do
        :: !p -> accept: break    // it then must remains true forever after
    od
}
+9 −0
Original line number Diff line number Diff line
@@ -60,8 +60,14 @@ struct AioContext {
     */
    int walking_handlers;

    /* Used to avoid unnecessary event_notifier_set calls in aio_notify.
     * Writes protected by lock or BQL, reads are lockless.
     */
    bool dispatching;

    /* lock to protect between bh's adders and deleter */
    QemuMutex bh_lock;

    /* Anchor of the list of Bottom Halves belonging to the context */
    struct QEMUBH *first_bh;

@@ -83,6 +89,9 @@ struct AioContext {
    QEMUTimerListGroup tlg;
};

/* Used internally to synchronize aio_poll against qemu_bh_schedule.  */
void aio_set_dispatching(AioContext *ctx, bool dispatching);

/**
 * aio_context_new: Allocate a new AioContext.
 *