Commit 40f4a218 authored by Stefan Hajnoczi's avatar Stefan Hajnoczi Committed by Eric Blake
Browse files

nbd-client: avoid spurious qio_channel_yield() re-entry



The following scenario leads to an assertion failure in
qio_channel_yield():

1. Request coroutine calls qio_channel_yield() successfully when sending
   would block on the socket.  It is now yielded.
2. nbd_read_reply_entry() calls nbd_recv_coroutines_enter_all() because
   nbd_receive_reply() failed.
3. Request coroutine is entered and returns from qio_channel_yield().
   Note that the socket fd handler has not fired yet so
   ioc->write_coroutine is still set.
4. Request coroutine attempts to send the request body with nbd_rwv()
   but the socket would still block.  qio_channel_yield() is called
   again and assert(!ioc->write_coroutine) is hit.

The problem is that nbd_read_reply_entry() does not distinguish between
request coroutines that are waiting to receive a reply and those that
are not.

This patch adds a per-request bool receiving flag so
nbd_read_reply_entry() can avoid spurious aio_wake() calls.

Reported-by: default avatarDr. David Alan Gilbert <dgilbert@redhat.com>
Signed-off-by: default avatarStefan Hajnoczi <stefanha@redhat.com>
Message-Id: <20170822125113.5025-1-stefanha@redhat.com>
Reviewed-by: default avatarVladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Tested-by: default avatarEric Blake <eblake@redhat.com>
Reviewed-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
Signed-off-by: default avatarEric Blake <eblake@redhat.com>
parent 12314f2d
Loading
Loading
Loading
Loading
+22 −13
Original line number Diff line number Diff line
@@ -39,8 +39,10 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
    int i;

    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
        if (s->recv_coroutine[i]) {
            aio_co_wake(s->recv_coroutine[i]);
        NBDClientRequest *req = &s->requests[i];

        if (req->coroutine && req->receiving) {
            aio_co_wake(req->coroutine);
        }
    }
}
@@ -88,28 +90,28 @@ static coroutine_fn void nbd_read_reply_entry(void *opaque)
         * one coroutine is called until the reply finishes.
         */
        i = HANDLE_TO_INDEX(s, s->reply.handle);
        if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
        if (i >= MAX_NBD_REQUESTS ||
            !s->requests[i].coroutine ||
            !s->requests[i].receiving) {
            break;
        }

        /* We're woken up by the recv_coroutine itself.  Note that there
        /* We're woken up again by the request itself.  Note that there
         * is no race between yielding and reentering read_reply_co.  This
         * is because:
         *
         * - if recv_coroutine[i] runs on the same AioContext, it is only
         * - if the request runs on the same AioContext, it is only
         *   entered after we yield
         *
         * - if recv_coroutine[i] runs on a different AioContext, reentering
         * - if the request runs on a different AioContext, reentering
         *   read_reply_co happens through a bottom half, which can only
         *   run after we yield.
         */
        aio_co_wake(s->recv_coroutine[i]);
        aio_co_wake(s->requests[i].coroutine);
        qemu_coroutine_yield();
    }

    if (ret < 0) {
    s->quit = true;
    }
    nbd_recv_coroutines_enter_all(s);
    s->read_reply_co = NULL;
}
@@ -128,14 +130,17 @@ static int nbd_co_send_request(BlockDriverState *bs,
    s->in_flight++;

    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
        if (s->recv_coroutine[i] == NULL) {
            s->recv_coroutine[i] = qemu_coroutine_self();
        if (s->requests[i].coroutine == NULL) {
            break;
        }
    }

    g_assert(qemu_in_coroutine());
    assert(i < MAX_NBD_REQUESTS);

    s->requests[i].coroutine = qemu_coroutine_self();
    s->requests[i].receiving = false;

    request->handle = INDEX_TO_HANDLE(s, i);

    if (s->quit) {
@@ -173,10 +178,13 @@ static void nbd_co_receive_reply(NBDClientSession *s,
                                 NBDReply *reply,
                                 QEMUIOVector *qiov)
{
    int i = HANDLE_TO_INDEX(s, request->handle);
    int ret;

    /* Wait until we're woken up by nbd_read_reply_entry.  */
    s->requests[i].receiving = true;
    qemu_coroutine_yield();
    s->requests[i].receiving = false;
    *reply = s->reply;
    if (reply->handle != request->handle || !s->ioc || s->quit) {
        reply->error = EIO;
@@ -186,6 +194,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
                          NULL);
            if (ret != request->len) {
                reply->error = EIO;
                s->quit = true;
            }
        }

@@ -200,7 +209,7 @@ static void nbd_coroutine_end(BlockDriverState *bs,
    NBDClientSession *s = nbd_get_client_session(bs);
    int i = HANDLE_TO_INDEX(s, request->handle);

    s->recv_coroutine[i] = NULL;
    s->requests[i].coroutine = NULL;

    /* Kick the read_reply_co to get the next reply.  */
    if (s->read_reply_co) {
+6 −1
Original line number Diff line number Diff line
@@ -17,6 +17,11 @@

#define MAX_NBD_REQUESTS    16

typedef struct {
    Coroutine *coroutine;
    bool receiving;         /* waiting for read_reply_co? */
} NBDClientRequest;

typedef struct NBDClientSession {
    QIOChannelSocket *sioc; /* The master data channel */
    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
@@ -27,7 +32,7 @@ typedef struct NBDClientSession {
    Coroutine *read_reply_co;
    int in_flight;

    Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
    NBDClientRequest requests[MAX_NBD_REQUESTS];
    NBDReply reply;
    bool quit;
} NBDClientSession;