Commit c6a2225a authored by Peter Maydell's avatar Peter Maydell
Browse files

Merge remote-tracking branch 'remotes/ericb/tags/pull-nbd-2019-08-15' into staging



nbd patches for 2019-08-15

- Addition of InetSocketAddress keep-alive
- Addition of BDRV_REQ_PREFETCH for more efficient copy-on-read
- Initial refactoring in preparation of NBD reconnect

# gpg: Signature made Thu 15 Aug 2019 19:28:41 BST
# gpg:                using RSA key A7A16B4A2527436A
# gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full]
# gpg:                 aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full]
# gpg:                 aka "[jpeg image of size 6874]" [full]
# Primary key fingerprint: 71C2 CC22 B1C4 6029 27D2  F3AA A7A1 6B4A 2527 436A

* remotes/ericb/tags/pull-nbd-2019-08-15:
  block/nbd: refactor nbd connection parameters
  block/nbd: add cmdline and qapi parameter reconnect-delay
  block/nbd: move from quit to state
  block/nbd: use non-blocking io channel for nbd negotiation
  block/nbd: split connection_co start out of nbd_client_connect
  nbd: improve CMD_CACHE: use BDRV_REQ_PREFETCH
  block/stream: use BDRV_REQ_PREFETCH
  block: implement BDRV_REQ_PREFETCH
  qapi: Add InetSocketAddress member keep-alive

Signed-off-by: default avatarPeter Maydell <peter.maydell@linaro.org>
parents 95a9457f 8f071c9d
Loading
Loading
Loading
Loading
+12 −6
Original line number Diff line number Diff line
@@ -1168,7 +1168,8 @@ bdrv_driver_pwritev_compressed(BlockDriverState *bs, uint64_t offset,
}

static int coroutine_fn bdrv_co_do_copy_on_readv(BdrvChild *child,
        int64_t offset, unsigned int bytes, QEMUIOVector *qiov)
        int64_t offset, unsigned int bytes, QEMUIOVector *qiov,
        int flags)
{
    BlockDriverState *bs = child->bs;

@@ -1279,9 +1280,11 @@ static int coroutine_fn bdrv_co_do_copy_on_readv(BdrvChild *child,
                goto err;
            }

            if (!(flags & BDRV_REQ_PREFETCH)) {
                qemu_iovec_from_buf(qiov, progress, bounce_buffer + skip_bytes,
                                    pnum - skip_bytes);
        } else {
            }
        } else if (!(flags & BDRV_REQ_PREFETCH)) {
            /* Read directly into the destination */
            qemu_iovec_init(&local_qiov, qiov->niov);
            qemu_iovec_concat(&local_qiov, qiov, progress, pnum - skip_bytes);
@@ -1332,7 +1335,8 @@ static int coroutine_fn bdrv_aligned_preadv(BdrvChild *child,
     * potential fallback support, if we ever implement any read flags
     * to pass through to drivers.  For now, there aren't any
     * passthrough flags.  */
    assert(!(flags & ~(BDRV_REQ_NO_SERIALISING | BDRV_REQ_COPY_ON_READ)));
    assert(!(flags & ~(BDRV_REQ_NO_SERIALISING | BDRV_REQ_COPY_ON_READ |
                       BDRV_REQ_PREFETCH)));

    /* Handle Copy on Read and associated serialisation */
    if (flags & BDRV_REQ_COPY_ON_READ) {
@@ -1360,7 +1364,9 @@ static int coroutine_fn bdrv_aligned_preadv(BdrvChild *child,
        }

        if (!ret || pnum != bytes) {
            ret = bdrv_co_do_copy_on_readv(child, offset, bytes, qiov);
            ret = bdrv_co_do_copy_on_readv(child, offset, bytes, qiov, flags);
            goto out;
        } else if (flags & BDRV_REQ_PREFETCH) {
            goto out;
        }
    }
+113 −82
Original line number Diff line number Diff line
@@ -54,6 +54,11 @@ typedef struct {
    bool receiving;         /* waiting for connection_co? */
} NBDClientRequest;

typedef enum NBDClientState {
    NBD_CLIENT_CONNECTED,
    NBD_CLIENT_QUIT
} NBDClientState;

typedef struct BDRVNBDState {
    QIOChannelSocket *sioc; /* The master data channel */
    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
@@ -63,17 +68,27 @@ typedef struct BDRVNBDState {
    CoQueue free_sema;
    Coroutine *connection_co;
    int in_flight;
    NBDClientState state;

    NBDClientRequest requests[MAX_NBD_REQUESTS];
    NBDReply reply;
    BlockDriverState *bs;
    bool quit;

    /* For nbd_refresh_filename() */
    /* Connection parameters */
    uint32_t reconnect_delay;
    SocketAddress *saddr;
    char *export, *tlscredsid;
    QCryptoTLSCreds *tlscreds;
    const char *hostname;
    char *x_dirty_bitmap;
} BDRVNBDState;

/* @ret will be used for reconnect in future */
static void nbd_channel_error(BDRVNBDState *s, int ret)
{
    s->state = NBD_CLIENT_QUIT;
}

static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
{
    int i;
@@ -152,7 +167,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
    int ret = 0;
    Error *local_err = NULL;

    while (!s->quit) {
    while (s->state != NBD_CLIENT_QUIT) {
        /*
         * The NBD client can only really be considered idle when it has
         * yielded from qio_channel_readv_all_eof(), waiting for data. This is
@@ -170,6 +185,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
            error_free(local_err);
        }
        if (ret <= 0) {
            nbd_channel_error(s, ret ? ret : -EIO);
            break;
        }

@@ -184,6 +200,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
            !s->requests[i].receiving ||
            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
        {
            nbd_channel_error(s, -EINVAL);
            break;
        }

@@ -203,7 +220,6 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
        qemu_coroutine_yield();
    }

    s->quit = true;
    nbd_recv_coroutines_wake_all(s);
    bdrv_dec_in_flight(s->bs);

@@ -216,12 +232,18 @@ static int nbd_co_send_request(BlockDriverState *bs,
                               QEMUIOVector *qiov)
{
    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
    int rc, i;
    int rc, i = -1;

    qemu_co_mutex_lock(&s->send_mutex);
    while (s->in_flight == MAX_NBD_REQUESTS) {
        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
    }

    if (s->state != NBD_CLIENT_CONNECTED) {
        rc = -EIO;
        goto err;
    }

    s->in_flight++;

    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
@@ -239,16 +261,12 @@ static int nbd_co_send_request(BlockDriverState *bs,

    request->handle = INDEX_TO_HANDLE(s, i);

    if (s->quit) {
        rc = -EIO;
        goto err;
    }
    assert(s->ioc);

    if (qiov) {
        qio_channel_set_cork(s->ioc, true);
        rc = nbd_send_request(s->ioc, request);
        if (rc >= 0 && !s->quit) {
        if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) {
            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
                                       NULL) < 0) {
                rc = -EIO;
@@ -263,9 +281,11 @@ static int nbd_co_send_request(BlockDriverState *bs,

err:
    if (rc < 0) {
        s->quit = true;
        nbd_channel_error(s, rc);
        if (i != -1) {
            s->requests[i].coroutine = NULL;
            s->in_flight--;
        }
        qemu_co_queue_next(&s->free_sema);
    }
    qemu_co_mutex_unlock(&s->send_mutex);
@@ -557,7 +577,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
    s->requests[i].receiving = true;
    qemu_coroutine_yield();
    s->requests[i].receiving = false;
    if (s->quit) {
    if (s->state != NBD_CLIENT_CONNECTED) {
        error_setg(errp, "Connection closed");
        return -EIO;
    }
@@ -642,7 +662,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(

    if (ret < 0) {
        memset(reply, 0, sizeof(*reply));
        s->quit = true;
        nbd_channel_error(s, ret);
    } else {
        /* For assert at loop start in nbd_connection_entry */
        *reply = s->reply;
@@ -710,7 +730,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
    NBDReply local_reply;
    NBDStructuredReplyChunk *chunk;
    Error *local_err = NULL;
    if (s->quit) {
    if (s->state != NBD_CLIENT_CONNECTED) {
        error_setg(&local_err, "Connection closed");
        nbd_iter_channel_error(iter, -EIO, &local_err);
        goto break_loop;
@@ -735,7 +755,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
    }

    /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
    if (nbd_reply_is_simple(reply) || s->quit) {
    if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) {
        goto break_loop;
    }

@@ -809,14 +829,14 @@ static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
            ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
                                                offset, qiov, &local_err);
            if (ret < 0) {
                s->quit = true;
                nbd_channel_error(s, ret);
                nbd_iter_channel_error(&iter, ret, &local_err);
            }
            break;
        default:
            if (!nbd_reply_type_is_error(chunk->type)) {
                /* not allowed reply type */
                s->quit = true;
                nbd_channel_error(s, -EINVAL);
                error_setg(&local_err,
                           "Unexpected reply type: %d (%s) for CMD_READ",
                           chunk->type, nbd_reply_type_lookup(chunk->type));
@@ -854,7 +874,7 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
        switch (chunk->type) {
        case NBD_REPLY_TYPE_BLOCK_STATUS:
            if (received) {
                s->quit = true;
                nbd_channel_error(s, -EINVAL);
                error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
            }
@@ -864,13 +884,13 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
                                                payload, length, extent,
                                                &local_err);
            if (ret < 0) {
                s->quit = true;
                nbd_channel_error(s, ret);
                nbd_iter_channel_error(&iter, ret, &local_err);
            }
            break;
        default:
            if (!nbd_reply_type_is_error(chunk->type)) {
                s->quit = true;
                nbd_channel_error(s, -EINVAL);
                error_setg(&local_err,
                           "Unexpected reply type: %d (%s) "
                           "for CMD_BLOCK_STATUS",
@@ -1167,47 +1187,43 @@ static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
    return sioc;
}

static int nbd_client_connect(BlockDriverState *bs,
                              SocketAddress *saddr,
                              const char *export,
                              QCryptoTLSCreds *tlscreds,
                              const char *hostname,
                              const char *x_dirty_bitmap,
                              Error **errp)
static int nbd_client_connect(BlockDriverState *bs, Error **errp)
{
    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
    AioContext *aio_context = bdrv_get_aio_context(bs);
    int ret;

    /*
     * establish TCP connection, return error if it fails
     * TODO: Configurable retry-until-timeout behaviour.
     */
    QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);

    if (!sioc) {
        return -ECONNREFUSED;
    }

    /* NBD handshake */
    trace_nbd_client_connect(export);
    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
    trace_nbd_client_connect(s->export);
    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
    qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context);

    s->info.request_sizes = true;
    s->info.structured_reply = true;
    s->info.base_allocation = true;
    s->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
    s->info.name = g_strdup(export ?: "");
    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
                                &s->ioc, &s->info, errp);
    s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
    s->info.name = g_strdup(s->export ?: "");
    ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds,
                                s->hostname, &s->ioc, &s->info, errp);
    g_free(s->info.x_dirty_bitmap);
    g_free(s->info.name);
    if (ret < 0) {
        object_unref(OBJECT(sioc));
        return ret;
    }
    if (x_dirty_bitmap && !s->info.base_allocation) {
    if (s->x_dirty_bitmap && !s->info.base_allocation) {
        error_setg(errp, "requested x-dirty-bitmap %s not found",
                   x_dirty_bitmap);
                   s->x_dirty_bitmap);
        ret = -EINVAL;
        goto fail;
    }
@@ -1232,24 +1248,14 @@ static int nbd_client_connect(BlockDriverState *bs,
        object_ref(OBJECT(s->ioc));
    }

    /*
     * Now that we're connected, set the socket to be non-blocking and
     * kick the reply mechanism.
     */
    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
    bdrv_inc_in_flight(bs);
    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));

    trace_nbd_client_connect_success(export);
    trace_nbd_client_connect_success(s->export);

    return 0;

 fail:
    /*
     * We have connected, but must fail for other reasons. The
     * connection is still blocking; send NBD_CMD_DISC as a courtesy
     * to the server.
     * We have connected, but must fail for other reasons.
     * Send NBD_CMD_DISC as a courtesy to the server.
     */
    {
        NBDRequest request = { .type = NBD_CMD_DISC };
@@ -1262,23 +1268,9 @@ static int nbd_client_connect(BlockDriverState *bs,
    }
}

static int nbd_client_init(BlockDriverState *bs,
                           SocketAddress *saddr,
                           const char *export,
                           QCryptoTLSCreds *tlscreds,
                           const char *hostname,
                           const char *x_dirty_bitmap,
                           Error **errp)
{
    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;

    s->bs = bs;
    qemu_co_mutex_init(&s->send_mutex);
    qemu_co_queue_init(&s->free_sema);

    return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
                              x_dirty_bitmap, errp);
}
/*
 * Parse nbd_open options
 */

static int nbd_parse_uri(const char *filename, QDict *options)
{
@@ -1583,18 +1575,27 @@ static QemuOptsList nbd_runtime_opts = {
            .help = "experimental: expose named dirty bitmap in place of "
                    "block status",
        },
        {
            .name = "reconnect-delay",
            .type = QEMU_OPT_NUMBER,
            .help = "On an unexpected disconnect, the nbd client tries to "
                    "connect again until succeeding or encountering a serious "
                    "error.  During the first @reconnect-delay seconds, all "
                    "requests are paused and will be rerun on a successful "
                    "reconnect. After that time, any delayed requests and all "
                    "future requests before a successful reconnect will "
                    "immediately fail. Default 0",
        },
        { /* end of list */ }
    },
};

static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
static int nbd_process_options(BlockDriverState *bs, QDict *options,
                               Error **errp)
{
    BDRVNBDState *s = bs->opaque;
    QemuOpts *opts = NULL;
    QemuOpts *opts;
    Error *local_err = NULL;
    QCryptoTLSCreds *tlscreds = NULL;
    const char *hostname = NULL;
    int ret = -EINVAL;

    opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
@@ -1619,8 +1620,8 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,

    s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
    if (s->tlscredsid) {
        tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
        if (!tlscreds) {
        s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
        if (!s->tlscreds) {
            goto error;
        }

@@ -1629,18 +1630,17 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
            error_setg(errp, "TLS only supported over IP sockets");
            goto error;
        }
        hostname = s->saddr->u.inet.host;
        s->hostname = s->saddr->u.inet.host;
    }

    /* NBD handshake */
    ret = nbd_client_init(bs, s->saddr, s->export, tlscreds, hostname,
                          qemu_opt_get(opts, "x-dirty-bitmap"), errp);
    s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
    s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);

    ret = 0;

 error:
    if (tlscreds) {
        object_unref(OBJECT(tlscreds));
    }
    if (ret < 0) {
        object_unref(OBJECT(s->tlscreds));
        qapi_free_SocketAddress(s->saddr);
        g_free(s->export);
        g_free(s->tlscredsid);
@@ -1649,6 +1649,35 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
    return ret;
}

static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
                    Error **errp)
{
    int ret;
    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;

    ret = nbd_process_options(bs, options, errp);
    if (ret < 0) {
        return ret;
    }

    s->bs = bs;
    qemu_co_mutex_init(&s->send_mutex);
    qemu_co_queue_init(&s->free_sema);

    ret = nbd_client_connect(bs, errp);
    if (ret < 0) {
        return ret;
    }
    /* successfully connected */
    s->state = NBD_CLIENT_CONNECTED;

    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
    bdrv_inc_in_flight(bs);
    aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);

    return 0;
}

static int nbd_co_flush(BlockDriverState *bs)
{
    return nbd_client_co_flush(bs);
@@ -1694,9 +1723,11 @@ static void nbd_close(BlockDriverState *bs)

    nbd_client_close(bs);

    object_unref(OBJECT(s->tlscreds));
    qapi_free_SocketAddress(s->saddr);
    g_free(s->export);
    g_free(s->tlscredsid);
    g_free(s->x_dirty_bitmap);
}

static int64_t nbd_getlength(BlockDriverState *bs)
+9 −15
Original line number Diff line number Diff line
@@ -22,11 +22,11 @@

enum {
    /*
     * Size of data buffer for populating the image file.  This should be large
     * enough to process multiple clusters in a single call, so that populating
     * contiguous regions of the image is efficient.
     * Maximum chunk size to feed to copy-on-read.  This should be
     * large enough to process multiple clusters in a single call, so
     * that populating contiguous regions of the image is efficient.
     */
    STREAM_BUFFER_SIZE = 512 * 1024, /* in bytes */
    STREAM_CHUNK = 512 * 1024, /* in bytes */
};

typedef struct StreamBlockJob {
@@ -39,13 +39,12 @@ typedef struct StreamBlockJob {
} StreamBlockJob;

static int coroutine_fn stream_populate(BlockBackend *blk,
                                        int64_t offset, uint64_t bytes,
                                        void *buf)
                                        int64_t offset, uint64_t bytes)
{
    assert(bytes < SIZE_MAX);

    /* Copy-on-read the unallocated clusters */
    return blk_co_pread(blk, offset, bytes, buf, BDRV_REQ_COPY_ON_READ);
    return blk_co_preadv(blk, offset, bytes, NULL,
                         BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
}

static void stream_abort(Job *job)
@@ -117,7 +116,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
    int error = 0;
    int ret = 0;
    int64_t n = 0; /* bytes */
    void *buf;

    if (bs == s->bottom) {
        /* Nothing to stream */
@@ -130,8 +128,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
    }
    job_progress_set_remaining(&s->common.job, len);

    buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);

    /* Turn on copy-on-read for the whole block device so that guest read
     * requests help us make progress.  Only do this when copying the entire
     * backing chain since the copy-on-read operation does not take base into
@@ -154,7 +150,7 @@ static int coroutine_fn stream_run(Job *job, Error **errp)

        copy = false;

        ret = bdrv_is_allocated(bs, offset, STREAM_BUFFER_SIZE, &n);
        ret = bdrv_is_allocated(bs, offset, STREAM_CHUNK, &n);
        if (ret == 1) {
            /* Allocated in the top, no need to copy.  */
        } else if (ret >= 0) {
@@ -171,7 +167,7 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
        }
        trace_stream_one_iteration(s, offset, n, ret);
        if (copy) {
            ret = stream_populate(blk, offset, n, buf);
            ret = stream_populate(blk, offset, n);
        }
        if (ret < 0) {
            BlockErrorAction action =
@@ -202,8 +198,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
        bdrv_disable_copy_on_read(bs);
    }

    qemu_vfree(buf);

    /* Do not remove the backing file if an error was there but ignored. */
    return error;
}
+7 −1
Original line number Diff line number Diff line
@@ -87,8 +87,14 @@ typedef enum {
     * fallback. */
    BDRV_REQ_NO_FALLBACK        = 0x100,

    /*
     * BDRV_REQ_PREFETCH may be used only together with BDRV_REQ_COPY_ON_READ
     * on read request and means that caller doesn't really need data to be
     * written to qiov parameter which may be NULL.
     */
    BDRV_REQ_PREFETCH  = 0x200,
    /* Mask of valid flags */
    BDRV_REQ_MASK               = 0x1ff,
    BDRV_REQ_MASK               = 0x3ff,
} BdrvRequestFlags;

typedef struct BlockSizes {
+2 −1
Original line number Diff line number Diff line
@@ -304,7 +304,8 @@ struct NBDExportInfo {
};
typedef struct NBDExportInfo NBDExportInfo;

int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc,
                          QCryptoTLSCreds *tlscreds,
                          const char *hostname, QIOChannel **outioc,
                          NBDExportInfo *info, Error **errp);
void nbd_free_export_list(NBDExportInfo *info, int count);
Loading