Commit 6bdcc018 authored by Paolo Bonzini's avatar Paolo Bonzini
Browse files

nbd: make it thread-safe, fix qcow2 over nbd



NBD is not thread safe, because it accesses s->in_flight without
a CoMutex.  Fixing this will be required for multiqueue.
CoQueue doesn't have spurious wakeups but, when another coroutine can
run between qemu_co_queue_next's wakeup and qemu_co_queue_wait's
re-locking of the mutex, the wait condition can become false and
a loop is necessary.

In fact, it turns out that the loop is necessary even without this
multi-threaded scenario.  A particular sequence of coroutine wakeups
is happening ~80% of the time when starting a guest with qcow2 image
served over NBD (i.e. qemu-nbd --format=raw, and QEMU's -drive option
has -format=qcow2).  This patch fixes that issue too.

Signed-off-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
parent b8158192
Loading
Loading
Loading
Loading
+9 −21
Original line number Diff line number Diff line
@@ -119,6 +119,10 @@ static int nbd_co_send_request(BlockDriverState *bs,
    int rc, ret, i;

    qemu_co_mutex_lock(&s->send_mutex);
    while (s->in_flight == MAX_NBD_REQUESTS) {
        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
    }
    s->in_flight++;

    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
        if (s->recv_coroutine[i] == NULL) {
@@ -181,20 +185,6 @@ static void nbd_co_receive_reply(NBDClientSession *s,
    }
}

static void nbd_coroutine_start(NBDClientSession *s,
                                NBDRequest *request)
{
    /* Poor man semaphore.  The free_sema is locked when no other request
     * can be accepted, and unlocked after receiving one reply.  */
    if (s->in_flight == MAX_NBD_REQUESTS) {
        qemu_co_queue_wait(&s->free_sema, NULL);
        assert(s->in_flight < MAX_NBD_REQUESTS);
    }
    s->in_flight++;

    /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
}

static void nbd_coroutine_end(BlockDriverState *bs,
                              NBDRequest *request)
{
@@ -202,13 +192,16 @@ static void nbd_coroutine_end(BlockDriverState *bs,
    int i = HANDLE_TO_INDEX(s, request->handle);

    s->recv_coroutine[i] = NULL;
    s->in_flight--;
    qemu_co_queue_next(&s->free_sema);

    /* Kick the read_reply_co to get the next reply.  */
    if (s->read_reply_co) {
        aio_co_wake(s->read_reply_co);
    }

    qemu_co_mutex_lock(&s->send_mutex);
    s->in_flight--;
    qemu_co_queue_next(&s->free_sema);
    qemu_co_mutex_unlock(&s->send_mutex);
}

int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
@@ -226,7 +219,6 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
    assert(bytes <= NBD_MAX_BUFFER_SIZE);
    assert(!flags);

    nbd_coroutine_start(client, &request);
    ret = nbd_co_send_request(bs, &request, NULL);
    if (ret < 0) {
        reply.error = -ret;
@@ -256,7 +248,6 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,

    assert(bytes <= NBD_MAX_BUFFER_SIZE);

    nbd_coroutine_start(client, &request);
    ret = nbd_co_send_request(bs, &request, qiov);
    if (ret < 0) {
        reply.error = -ret;
@@ -291,7 +282,6 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
        request.flags |= NBD_CMD_FLAG_NO_HOLE;
    }

    nbd_coroutine_start(client, &request);
    ret = nbd_co_send_request(bs, &request, NULL);
    if (ret < 0) {
        reply.error = -ret;
@@ -316,7 +306,6 @@ int nbd_client_co_flush(BlockDriverState *bs)
    request.from = 0;
    request.len = 0;

    nbd_coroutine_start(client, &request);
    ret = nbd_co_send_request(bs, &request, NULL);
    if (ret < 0) {
        reply.error = -ret;
@@ -342,7 +331,6 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
        return 0;
    }

    nbd_coroutine_start(client, &request);
    ret = nbd_co_send_request(bs, &request, NULL);
    if (ret < 0) {
        reply.error = -ret;