Commit 4ce5dd3e authored by Vladimir Sementsov-Ogievskiy's avatar Vladimir Sementsov-Ogievskiy Committed by Max Reitz
Browse files

block/block-copy: use aio-task-pool API



Run block_copy iterations in parallel in aio tasks.

Changes:
  - BlockCopyTask becomes aio task structure. Add zeroes field to pass
    it to block_copy_do_copy
  - add call state - it's a state of one call of block_copy(), shared
    between parallel tasks. For now used only to keep information about
    first error: is it read or not.
  - convert block_copy_dirty_clusters to aio-task loop.

Signed-off-by: default avatarVladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Message-Id: <20200429130847.28124-6-vsementsov@virtuozzo.com>
Signed-off-by: default avatarMax Reitz <mreitz@redhat.com>
parent 42ac2144
Loading
Loading
Loading
Loading
+106 −13
Original line number Diff line number Diff line
@@ -19,15 +19,29 @@
#include "block/block-copy.h"
#include "sysemu/block-backend.h"
#include "qemu/units.h"
#include "qemu/coroutine.h"
#include "block/aio_task.h"

#define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
#define BLOCK_COPY_MAX_BUFFER (1 * MiB)
#define BLOCK_COPY_MAX_MEM (128 * MiB)
#define BLOCK_COPY_MAX_WORKERS 64

static coroutine_fn int block_copy_task_entry(AioTask *task);

typedef struct BlockCopyCallState {
    bool failed;
    bool error_is_read;
} BlockCopyCallState;

typedef struct BlockCopyTask {
    AioTask task;

    BlockCopyState *s;
    BlockCopyCallState *call_state;
    int64_t offset;
    int64_t bytes;
    bool zeroes;
    QLIST_ENTRY(BlockCopyTask) list;
    CoQueue wait_queue; /* coroutines blocked on this task */
} BlockCopyTask;
@@ -116,6 +130,7 @@ static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
 * the beginning of it.
 */
static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
                                             BlockCopyCallState *call_state,
                                             int64_t offset, int64_t bytes)
{
    BlockCopyTask *task;
@@ -135,7 +150,9 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s,

    task = g_new(BlockCopyTask, 1);
    *task = (BlockCopyTask) {
        .task.func = block_copy_task_entry,
        .s = s,
        .call_state = call_state,
        .offset = offset,
        .bytes = bytes,
    };
@@ -263,6 +280,38 @@ void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
    s->progress = pm;
}

/*
 * Takes ownership of @task
 *
 * If pool is NULL directly run the task, otherwise schedule it into the pool.
 *
 * Returns: task.func return code if pool is NULL
 *          otherwise -ECANCELED if pool status is bad
 *          otherwise 0 (successfully scheduled)
 */
static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
                                            BlockCopyTask *task)
{
    if (!pool) {
        int ret = task->task.func(&task->task);

        g_free(task);
        return ret;
    }

    aio_task_pool_wait_slot(pool);
    if (aio_task_pool_status(pool) < 0) {
        co_put_to_shres(task->s->mem, task->bytes);
        block_copy_task_end(task, -ECANCELED);
        g_free(task);
        return -ECANCELED;
    }

    aio_task_pool_start_task(pool, &task->task);

    return 0;
}

/*
 * block_copy_do_copy
 *
@@ -366,6 +415,27 @@ out:
    return ret;
}

static coroutine_fn int block_copy_task_entry(AioTask *task)
{
    BlockCopyTask *t = container_of(task, BlockCopyTask, task);
    bool error_is_read;
    int ret;

    ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes,
                             &error_is_read);
    if (ret < 0 && !t->call_state->failed) {
        t->call_state->failed = true;
        t->call_state->error_is_read = error_is_read;
    } else {
        progress_work_done(t->s->progress, t->bytes);
        t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque);
    }
    co_put_to_shres(t->s->mem, t->bytes);
    block_copy_task_end(t, ret);

    return ret;
}

static int block_copy_block_status(BlockCopyState *s, int64_t offset,
                                   int64_t bytes, int64_t *pnum)
{
@@ -484,6 +554,8 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s,
    int ret = 0;
    bool found_dirty = false;
    int64_t end = offset + bytes;
    AioTaskPool *aio = NULL;
    BlockCopyCallState call_state = {false, false};

    /*
     * block_copy() user is responsible for keeping source and target in same
@@ -495,11 +567,11 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s,
    assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
    assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));

    while (bytes) {
        g_autofree BlockCopyTask *task = NULL;
    while (bytes && aio_task_pool_status(aio) == 0) {
        BlockCopyTask *task;
        int64_t status_bytes;

        task = block_copy_task_create(s, offset, bytes);
        task = block_copy_task_create(s, &call_state, offset, bytes);
        if (!task) {
            /* No more dirty bits in the bitmap */
            trace_block_copy_skip_range(s, offset, bytes);
@@ -519,6 +591,7 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s,
        }
        if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) {
            block_copy_task_end(task, 0);
            g_free(task);
            progress_set_remaining(s->progress,
                                   bdrv_get_dirty_count(s->copy_bitmap) +
                                   s->in_flight_bytes);
@@ -527,25 +600,45 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s,
            bytes = end - offset;
            continue;
        }
        task->zeroes = ret & BDRV_BLOCK_ZERO;

        trace_block_copy_process(s, task->offset);

        co_get_from_shres(s->mem, task->bytes);
        ret = block_copy_do_copy(s, task->offset, task->bytes,
                                 ret & BDRV_BLOCK_ZERO, error_is_read);
        co_put_to_shres(s->mem, task->bytes);
        block_copy_task_end(task, ret);
        if (ret < 0) {
            return ret;
        }

        progress_work_done(s->progress, task->bytes);
        s->progress_bytes_callback(task->bytes, s->progress_opaque);
        offset = task_end(task);
        bytes = end - offset;

        if (!aio && bytes) {
            aio = aio_task_pool_new(BLOCK_COPY_MAX_WORKERS);
        }

        ret = block_copy_task_run(aio, task);
        if (ret < 0) {
            goto out;
        }
    }

out:
    if (aio) {
        aio_task_pool_wait_all(aio);

        /*
         * We are not really interested in -ECANCELED returned from
         * block_copy_task_run. If it fails, it means some task already failed
         * for real reason, let's return first failure.
         * Still, assert that we don't rewrite failure by success.
         */
        assert(ret == 0 || aio_task_pool_status(aio) < 0);
        ret = aio_task_pool_status(aio);

        aio_task_pool_free(aio);
    }
    if (error_is_read && ret < 0) {
        *error_is_read = call_state.error_is_read;
    }

    return found_dirty;
    return ret < 0 ? ret : found_dirty;
}

/*