Commit 28b24087 authored by Paolo Bonzini's avatar Paolo Bonzini Committed by Stefan Hajnoczi
Browse files

linux-aio: queue requests that cannot be submitted



Keep a queue of requests that were not submitted; pass them to
the kernel when a completion is reported, unless the queue is
plugged.

The array of iocbs is rebuilt every time from scratch.  This
avoids keeping the iocbs array and list synchronized.

Signed-off-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
Reviewed-by: default avatarKevin Wolf <kwolf@redhat.com>
Message-id: 1418305950-30924-2-git-send-email-pbonzini@redhat.com
Signed-off-by: default avatarStefan Hajnoczi <stefanha@redhat.com>
parent b5cf2c1b
Loading
Loading
Loading
Loading
+33 −42
Original line number Diff line number Diff line
@@ -35,14 +35,13 @@ struct qemu_laiocb {
    size_t nbytes;
    QEMUIOVector *qiov;
    bool is_read;
    QLIST_ENTRY(qemu_laiocb) node;
    QSIMPLEQ_ENTRY(qemu_laiocb) next;
};

typedef struct {
    struct iocb *iocbs[MAX_QUEUED_IO];
    int plugged;
    unsigned int size;
    unsigned int idx;
    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
} LaioQueue;

struct qemu_laio_state {
@@ -59,6 +58,8 @@ struct qemu_laio_state {
    int event_max;
};

static int ioq_submit(struct qemu_laio_state *s);

static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -135,6 +136,10 @@ static void qemu_laio_completion_bh(void *opaque)

        qemu_laio_process_completion(s, laiocb);
    }

    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ioq_submit(s);
    }
}

static void qemu_laio_completion_cb(EventNotifier *e)
@@ -172,50 +177,38 @@ static const AIOCBInfo laio_aiocb_info = {

static void ioq_init(LaioQueue *io_q)
{
    io_q->size = MAX_QUEUED_IO;
    io_q->idx = 0;
    QSIMPLEQ_INIT(&io_q->pending);
    io_q->plugged = 0;
    io_q->idx = 0;
}

static int ioq_submit(struct qemu_laio_state *s)
{
    int ret, i = 0;
    int len = s->io_q.idx;

    do {
        ret = io_submit(s->ctx, len, s->io_q.iocbs);
    } while (i++ < 3 && ret == -EAGAIN);

    /* empty io queue */
    s->io_q.idx = 0;
    int ret, i;
    int len = 0;
    struct qemu_laiocb *aiocb;
    struct iocb *iocbs[MAX_QUEUED_IO];

    if (ret < 0) {
        i = 0;
    } else {
        i = ret;
    QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
        iocbs[len++] = &aiocb->iocb;
        if (len == MAX_QUEUED_IO) {
            break;
        }
    }

    for (; i < len; i++) {
        struct qemu_laiocb *laiocb =
            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);

        laiocb->ret = (ret < 0) ? ret : -EIO;
        qemu_laio_process_completion(s, laiocb);
    ret = io_submit(s->ctx, len, iocbs);
    if (ret == -EAGAIN) {
        ret = 0;
    }
    return ret;
    if (ret < 0) {
        abort();
    }

static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
{
    unsigned int idx = s->io_q.idx;

    s->io_q.iocbs[idx++] = iocb;
    s->io_q.idx = idx;

    /* submit immediately if queue is full */
    if (idx == s->io_q.size) {
        ioq_submit(s);
    for (i = 0; i < ret; i++) {
        s->io_q.idx--;
        QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
    }
    return ret;
}

void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -236,7 +229,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
        return 0;
    }

    if (s->io_q.idx > 0) {
    if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ret = ioq_submit(s);
    }

@@ -276,12 +269,10 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
    }
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));

    if (!s->io_q.plugged) {
        if (io_submit(s->ctx, 1, &iocbs) < 0) {
            goto out_free_aiocb;
        }
    } else {
        ioq_enqueue(s, iocbs);
    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
    s->io_q.idx++;
    if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {
        ioq_submit(s);
    }
    return &laiocb->common;