Commit 53761caf authored by Peter Maydell's avatar Peter Maydell
Browse files

Merge remote-tracking branch 'remotes/cody/tags/block-pull-request' into staging



# gpg: Signature made Wed 01 Feb 2017 05:32:23 GMT
# gpg:                using RSA key 0xBDBE7B27C0DE3057
# gpg: Good signature from "Jeffrey Cody <jcody@redhat.com>"
# gpg:                 aka "Jeffrey Cody <jeff@codyprime.org>"
# gpg:                 aka "Jeffrey Cody <codyprime@gmail.com>"
# Primary key fingerprint: 9957 4B4D 3474 90E7 9D98  D624 BDBE 7B27 C0DE 3057

* remotes/cody/tags/block-pull-request:
  sheepdog: reorganize check for overlapping requests
  sheepdog: simplify inflight_aio_head management
  sheepdog: do not use BlockAIOCB
  sheepdog: reorganize coroutine flow
  sheepdog: remove unused cancellation support

Signed-off-by: default avatarPeter Maydell <peter.maydell@linaro.org>
parents e905587b acf6e5f0
Loading
Loading
Loading
Loading
+84 −205
Original line number Diff line number Diff line
@@ -306,6 +306,7 @@ static inline size_t count_data_objs(const struct SheepdogInode *inode)
    } while (0)

typedef struct SheepdogAIOCB SheepdogAIOCB;
typedef struct BDRVSheepdogState BDRVSheepdogState;

typedef struct AIOReq {
    SheepdogAIOCB *aiocb;
@@ -334,7 +335,7 @@ enum AIOCBState {
       || y->max_affect_data_idx < x->min_affect_data_idx))

struct SheepdogAIOCB {
    BlockAIOCB common;
    BDRVSheepdogState *s;

    QEMUIOVector *qiov;

@@ -345,9 +346,6 @@ struct SheepdogAIOCB {
    enum AIOCBState aiocb_type;

    Coroutine *coroutine;
    void (*aio_done_func)(SheepdogAIOCB *);

    bool cancelable;
    int nr_pending;

    uint32_t min_affect_data_idx;
@@ -365,7 +363,7 @@ struct SheepdogAIOCB {
    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
};

typedef struct BDRVSheepdogState {
struct BDRVSheepdogState {
    BlockDriverState *bs;
    AioContext *aio_context;

@@ -392,7 +390,7 @@ typedef struct BDRVSheepdogState {

    CoQueue overlapping_queue;
    QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
} BDRVSheepdogState;
};

typedef struct BDRVSheepdogReopenState {
    int fd;
@@ -450,14 +448,13 @@ static const char * sd_strerror(int err)
 *
 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 *    link the requests to the inflight_list in the
 *    BDRVSheepdogState.  The function exits without waiting for
 *    BDRVSheepdogState.  The function yields while waiting for
 *    receiving the response.
 *
 * 2. We receive the response in aio_read_response, the fd handler to
 *    the sheepdog connection.  If metadata update is needed, we send
 *    the write request to the vdi object in sd_write_done, the write
 *    completion function.  We switch back to sd_co_readv/writev after
 *    all the requests belonging to the AIOCB are finished.
 *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
 *    after all the requests belonging to the AIOCB are finished.  If
 *    needed, sd_co_writev will send another requests for the vdi object.
 */

static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -482,94 +479,34 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
    return aio_req;
}

static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
{
    SheepdogAIOCB *acb = aio_req->aiocb;

    acb->cancelable = false;
    QLIST_REMOVE(aio_req, aio_siblings);
    g_free(aio_req);

    acb->nr_pending--;
}

static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
{
    qemu_coroutine_enter(acb->coroutine);
    qemu_aio_unref(acb);
}

/*
 * Check whether the specified acb can be canceled
 *
 * We can cancel aio when any request belonging to the acb is:
 *  - Not processed by the sheepdog server.
 *  - Not linked to the inflight queue.
 */
static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
{
    BDRVSheepdogState *s = acb->common.bs->opaque;
    AIOReq *aioreq;

    if (!acb->cancelable) {
        return false;
    }

    QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
        if (aioreq->aiocb == acb) {
            return false;
        }
    }

    return true;
}

static void sd_aio_cancel(BlockAIOCB *blockacb)
static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
{
    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
    BDRVSheepdogState *s = acb->common.bs->opaque;
    AIOReq *aioreq, *next;

    if (sd_acb_cancelable(acb)) {
        /* Remove outstanding requests from failed queue.  */
        QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
                           next) {
            if (aioreq->aiocb == acb) {
                free_aio_req(s, aioreq);
            }
        }
    SheepdogAIOCB *cb;

        assert(acb->nr_pending == 0);
        if (acb->common.cb) {
            acb->common.cb(acb->common.opaque, -ECANCELED);
retry:
    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
        if (AIOCBOverlapping(acb, cb)) {
            qemu_co_queue_wait(&s->overlapping_queue);
            goto retry;
        }
        sd_finish_aiocb(acb);
    }
}

static const AIOCBInfo sd_aiocb_info = {
    .aiocb_size     = sizeof(SheepdogAIOCB),
    .cancel_async   = sd_aio_cancel,
};

static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
                                   int64_t sector_num, int nb_sectors)
static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s,
                         QEMUIOVector *qiov, int64_t sector_num, int nb_sectors,
                         int type)
{
    SheepdogAIOCB *acb;
    uint32_t object_size;
    BDRVSheepdogState *s = bs->opaque;

    object_size = (UINT32_C(1) << s->inode.block_size_shift);

    acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
    acb->s = s;

    acb->qiov = qiov;

    acb->sector_num = sector_num;
    acb->nb_sectors = nb_sectors;

    acb->aio_done_func = NULL;
    acb->cancelable = true;
    acb->coroutine = qemu_coroutine_self();
    acb->ret = 0;
    acb->nr_pending = 0;
@@ -580,8 +517,14 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,

    acb->min_dirty_data_idx = UINT32_MAX;
    acb->max_dirty_data_idx = 0;
    acb->aiocb_type = type;

    return acb;
    if (type == AIOCB_FLUSH_CACHE) {
        return;
    }

    wait_for_overlapping_aiocb(s, acb);
    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings);
}

/* Return -EIO in case of error, file descriptor on success */
@@ -797,7 +740,6 @@ static coroutine_fn void reconnect_to_sdog(void *opaque)
    while (!QLIST_EMPTY(&s->failed_aio_head)) {
        aio_req = QLIST_FIRST(&s->failed_aio_head);
        QLIST_REMOVE(aio_req, aio_siblings);
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
        resend_aioreq(s, aio_req);
    }
}
@@ -840,9 +782,6 @@ static void coroutine_fn aio_read_response(void *opaque)

    switch (acb->aiocb_type) {
    case AIOCB_WRITE_UDATA:
        /* this coroutine context is no longer suitable for co_recv
         * because we may send data to update vdi objects */
        s->co_recv = NULL;
        if (!is_data_obj(aio_req->oid)) {
            break;
        }
@@ -890,6 +829,12 @@ static void coroutine_fn aio_read_response(void *opaque)
        }
    }

    /* No more data for this aio_req (reload_inode below uses its own file
     * descriptor handler which doesn't use co_recv).
    */
    s->co_recv = NULL;

    QLIST_REMOVE(aio_req, aio_siblings);
    switch (rsp.result) {
    case SD_RES_SUCCESS:
        break;
@@ -907,26 +852,26 @@ static void coroutine_fn aio_read_response(void *opaque)
            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
        }
        resend_aioreq(s, aio_req);
        goto out;
        return;
    default:
        acb->ret = -EIO;
        error_report("%s", sd_strerror(rsp.result));
        break;
    }

    free_aio_req(s, aio_req);
    if (!acb->nr_pending) {
    g_free(aio_req);

    if (!--acb->nr_pending) {
        /*
         * We've finished all requests which belong to the AIOCB, so
         * we can switch back to sd_co_readv/writev now.
         */
        acb->aio_done_func(acb);
        qemu_coroutine_enter(acb->coroutine);
    }
out:
    s->co_recv = NULL;

    return;

err:
    s->co_recv = NULL;
    reconnect_to_sdog(opaque);
}

@@ -1176,6 +1121,8 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
    uint64_t old_oid = aio_req->base_oid;
    bool create = aio_req->create;

    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);

    if (!nr_copies) {
        error_report("bug");
    }
@@ -2025,11 +1972,10 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
/*
 * This function is called after writing data objects.  If we need to
 * update metadata, this sends a write request to the vdi object.
 * Otherwise, this switches back to sd_co_readv/writev.
 */
static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
{
    BDRVSheepdogState *s = acb->common.bs->opaque;
    BDRVSheepdogState *s = acb->s;
    struct iovec iov;
    AIOReq *aio_req;
    uint32_t offset, data_len, mn, mx;
@@ -2038,6 +1984,7 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
    mx = acb->max_dirty_data_idx;
    if (mn <= mx) {
        /* we need to update the vdi object. */
        ++acb->nr_pending;
        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
            mn * sizeof(s->inode.data_vdi_id[0]);
        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
@@ -2049,15 +1996,11 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
        iov.iov_len = sizeof(s->inode);
        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                                data_len, offset, 0, false, 0, offset);
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);

        acb->aio_done_func = sd_finish_aiocb;
        acb->aiocb_type = AIOCB_WRITE_UDATA;
        return;
        if (--acb->nr_pending) {
            qemu_coroutine_yield();
        }
    }

    sd_finish_aiocb(acb);
}

/* Delete current working VDI on the snapshot chain */
@@ -2169,16 +2112,15 @@ out:
 * Returns 1 when we need to wait a response, 0 when there is no sent
 * request and -errno in error cases.
 */
static int coroutine_fn sd_co_rw_vector(void *p)
static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb)
{
    SheepdogAIOCB *acb = p;
    int ret = 0;
    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
    unsigned long idx;
    uint32_t object_size;
    uint64_t oid;
    uint64_t offset;
    BDRVSheepdogState *s = acb->common.bs->opaque;
    BDRVSheepdogState *s = acb->s;
    SheepdogInode *inode = &s->inode;
    AIOReq *aio_req;

@@ -2190,7 +2132,7 @@ static int coroutine_fn sd_co_rw_vector(void *p)
        ret = sd_create_branch(s);
        if (ret) {
            acb->ret = -EIO;
            goto out;
            return;
        }
    }

@@ -2255,8 +2197,6 @@ static int coroutine_fn sd_co_rw_vector(void *p)
                                old_oid,
                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
                                0 : done);
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);

        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
                        acb->aiocb_type);
    done:
@@ -2264,31 +2204,25 @@ static int coroutine_fn sd_co_rw_vector(void *p)
        idx++;
        done += len;
    }
out:
    if (!--acb->nr_pending) {
        return acb->ret;
    if (--acb->nr_pending) {
        qemu_coroutine_yield();
    }
    return 1;
}

static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
static void sd_aio_complete(SheepdogAIOCB *acb)
{
    SheepdogAIOCB *cb;

    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
        if (AIOCBOverlapping(aiocb, cb)) {
            return true;
        }
    if (acb->aiocb_type == AIOCB_FLUSH_CACHE) {
        return;
    }

    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
    return false;
    QLIST_REMOVE(acb, aiocb_siblings);
    qemu_co_queue_restart_all(&acb->s->overlapping_queue);
}

static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
                        int nb_sectors, QEMUIOVector *qiov)
{
    SheepdogAIOCB *acb;
    SheepdogAIOCB acb;
    int ret;
    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
    BDRVSheepdogState *s = bs->opaque;
@@ -2300,85 +2234,50 @@ static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
        }
    }

    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
    acb->aio_done_func = sd_write_done;
    acb->aiocb_type = AIOCB_WRITE_UDATA;

retry:
    if (check_overlapping_aiocb(s, acb)) {
        qemu_co_queue_wait(&s->overlapping_queue);
        goto retry;
    }

    ret = sd_co_rw_vector(acb);
    if (ret <= 0) {
        QLIST_REMOVE(acb, aiocb_siblings);
        qemu_co_queue_restart_all(&s->overlapping_queue);
        qemu_aio_unref(acb);
        return ret;
    }

    qemu_coroutine_yield();

    QLIST_REMOVE(acb, aiocb_siblings);
    qemu_co_queue_restart_all(&s->overlapping_queue);
    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_WRITE_UDATA);
    sd_co_rw_vector(&acb);
    sd_write_done(&acb);
    sd_aio_complete(&acb);

    return acb->ret;
    return acb.ret;
}

static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
                       int nb_sectors, QEMUIOVector *qiov)
{
    SheepdogAIOCB *acb;
    int ret;
    SheepdogAIOCB acb;
    BDRVSheepdogState *s = bs->opaque;

    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
    acb->aiocb_type = AIOCB_READ_UDATA;
    acb->aio_done_func = sd_finish_aiocb;
    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_READ_UDATA);
    sd_co_rw_vector(&acb);
    sd_aio_complete(&acb);

retry:
    if (check_overlapping_aiocb(s, acb)) {
        qemu_co_queue_wait(&s->overlapping_queue);
        goto retry;
    }

    ret = sd_co_rw_vector(acb);
    if (ret <= 0) {
        QLIST_REMOVE(acb, aiocb_siblings);
        qemu_co_queue_restart_all(&s->overlapping_queue);
        qemu_aio_unref(acb);
        return ret;
    }

    qemu_coroutine_yield();

    QLIST_REMOVE(acb, aiocb_siblings);
    qemu_co_queue_restart_all(&s->overlapping_queue);
    return acb->ret;
    return acb.ret;
}

static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
{
    BDRVSheepdogState *s = bs->opaque;
    SheepdogAIOCB *acb;
    SheepdogAIOCB acb;
    AIOReq *aio_req;

    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
        return 0;
    }

    acb = sd_aio_setup(bs, NULL, 0, 0);
    acb->aiocb_type = AIOCB_FLUSH_CACHE;
    acb->aio_done_func = sd_finish_aiocb;
    sd_aio_setup(&acb, s, NULL, 0, 0, AIOCB_FLUSH_CACHE);

    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
    acb.nr_pending++;
    aio_req = alloc_aio_req(s, &acb, vid_to_vdi_oid(s->inode.vdi_id),
                            0, 0, 0, false, 0, 0);
    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
    add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
    add_aio_request(s, aio_req, NULL, 0, acb.aiocb_type);

    if (--acb.nr_pending) {
        qemu_coroutine_yield();
    return acb->ret;
    }

    sd_aio_complete(&acb);
    return acb.ret;
}

static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2812,9 +2711,8 @@ static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
                                      int count)
{
    SheepdogAIOCB *acb;
    SheepdogAIOCB acb;
    BDRVSheepdogState *s = bs->opaque;
    int ret;
    QEMUIOVector discard_iov;
    struct iovec iov;
    uint32_t zero = 0;
@@ -2832,31 +2730,12 @@ static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
    if (!QEMU_IS_ALIGNED(offset | count, BDRV_SECTOR_SIZE)) {
        return -ENOTSUP;
    }
    acb = sd_aio_setup(bs, &discard_iov, offset >> BDRV_SECTOR_BITS,
                       count >> BDRV_SECTOR_BITS);
    acb->aiocb_type = AIOCB_DISCARD_OBJ;
    acb->aio_done_func = sd_finish_aiocb;

retry:
    if (check_overlapping_aiocb(s, acb)) {
        qemu_co_queue_wait(&s->overlapping_queue);
        goto retry;
    }

    ret = sd_co_rw_vector(acb);
    if (ret <= 0) {
        QLIST_REMOVE(acb, aiocb_siblings);
        qemu_co_queue_restart_all(&s->overlapping_queue);
        qemu_aio_unref(acb);
        return ret;
    }

    qemu_coroutine_yield();

    QLIST_REMOVE(acb, aiocb_siblings);
    qemu_co_queue_restart_all(&s->overlapping_queue);
    sd_aio_setup(&acb, s, &discard_iov, offset >> BDRV_SECTOR_BITS,
                 count >> BDRV_SECTOR_BITS, AIOCB_DISCARD_OBJ);
    sd_co_rw_vector(&acb);
    sd_aio_complete(&acb);

    return acb->ret;
    return acb.ret;
}

static coroutine_fn int64_t