Commit bae8196d authored by Paolo Bonzini's avatar Paolo Bonzini Committed by Fam Zheng
Browse files

blockjob: introduce .drain callback for jobs



This is required to decouple block jobs from running in an
AioContext.  With multiqueue block devices, a BlockDriverState
does not really belong to a single AioContext.

The solution is to first wait until all I/O operations are
complete; then loop in the main thread for the block job to
complete entirely.

Signed-off-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
Reviewed-by: default avatarFam Zheng <famz@redhat.com>
Message-Id: <1477565348-5458-3-git-send-email-pbonzini@redhat.com>
Signed-off-by: default avatarFam Zheng <famz@redhat.com>
parent 50ab0e09
Loading
Loading
Loading
Loading
+17 −0
Original line number Diff line number Diff line
@@ -300,6 +300,21 @@ void backup_cow_request_end(CowRequest *req)
    cow_request_end(req);
}

static void backup_drain(BlockJob *job)
{
    BackupBlockJob *s = container_of(job, BackupBlockJob, common);

    /* Need to keep a reference in case blk_drain triggers execution
     * of backup_complete...
     */
    if (s->target) {
        BlockBackend *target = s->target;
        blk_ref(target);
        blk_drain(target);
        blk_unref(target);
    }
}

static const BlockJobDriver backup_job_driver = {
    .instance_size          = sizeof(BackupBlockJob),
    .job_type               = BLOCK_JOB_TYPE_BACKUP,
@@ -307,6 +322,7 @@ static const BlockJobDriver backup_job_driver = {
    .commit                 = backup_commit,
    .abort                  = backup_abort,
    .attached_aio_context   = backup_attached_aio_context,
    .drain                  = backup_drain,
};

static BlockErrorAction backup_error_action(BackupBlockJob *job,
@@ -331,6 +347,7 @@ static void backup_complete(BlockJob *job, void *opaque)
    BackupCompleteData *data = opaque;

    blk_unref(s->target);
    s->target = NULL;

    block_job_completed(job, data->ret);
    g_free(data);
+27 −8
Original line number Diff line number Diff line
@@ -469,7 +469,11 @@ static void mirror_free_init(MirrorBlockJob *s)
    }
}

static void mirror_drain(MirrorBlockJob *s)
/* This is also used for the .pause callback. There is no matching
 * mirror_resume() because mirror_run() will begin iterating again
 * when the job is resumed.
 */
static void mirror_wait_for_all_io(MirrorBlockJob *s)
{
    while (s->in_flight > 0) {
        mirror_wait_for_io(s);
@@ -528,6 +532,7 @@ static void mirror_exit(BlockJob *job, void *opaque)
    g_free(s->replaces);
    bdrv_op_unblock_all(target_bs, s->common.blocker);
    blk_unref(s->target);
    s->target = NULL;
    block_job_completed(&s->common, data->ret);
    g_free(data);
    bdrv_drained_end(src);
@@ -582,7 +587,7 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
            sector_num += nb_sectors;
        }

        mirror_drain(s);
        mirror_wait_for_all_io(s);
    }

    /* First part, loop on the sectors and initialize the dirty bitmap.  */
@@ -787,7 +792,7 @@ immediate_exit:
         * the target is a copy of the source.
         */
        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
        mirror_drain(s);
        mirror_wait_for_all_io(s);
    }

    assert(s->in_flight == 0);
@@ -872,14 +877,11 @@ static void mirror_complete(BlockJob *job, Error **errp)
    block_job_enter(&s->common);
}

/* There is no matching mirror_resume() because mirror_run() will begin
 * iterating again when the job is resumed.
 */
static void coroutine_fn mirror_pause(BlockJob *job)
static void mirror_pause(BlockJob *job)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);

    mirror_drain(s);
    mirror_wait_for_all_io(s);
}

static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
@@ -889,6 +891,21 @@ static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
    blk_set_aio_context(s->target, new_context);
}

static void mirror_drain(BlockJob *job)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);

    /* Need to keep a reference in case blk_drain triggers execution
     * of mirror_complete...
     */
    if (s->target) {
        BlockBackend *target = s->target;
        blk_ref(target);
        blk_drain(target);
        blk_unref(target);
    }
}

static const BlockJobDriver mirror_job_driver = {
    .instance_size          = sizeof(MirrorBlockJob),
    .job_type               = BLOCK_JOB_TYPE_MIRROR,
@@ -896,6 +913,7 @@ static const BlockJobDriver mirror_job_driver = {
    .complete               = mirror_complete,
    .pause                  = mirror_pause,
    .attached_aio_context   = mirror_attached_aio_context,
    .drain                  = mirror_drain,
};

static const BlockJobDriver commit_active_job_driver = {
@@ -905,6 +923,7 @@ static const BlockJobDriver commit_active_job_driver = {
    .complete               = mirror_complete,
    .pause                  = mirror_pause,
    .attached_aio_context   = mirror_attached_aio_context,
    .drain                  = mirror_drain,
};

static void mirror_start_job(const char *job_id, BlockDriverState *bs,
+20 −17
Original line number Diff line number Diff line
@@ -74,17 +74,6 @@ BlockJob *block_job_get(const char *id)
    return NULL;
}

/* Normally the job runs in its BlockBackend's AioContext.  The exception is
 * block_job_defer_to_main_loop() where it runs in the QEMU main loop.  Code
 * that supports both cases uses this helper function.
 */
static AioContext *block_job_get_aio_context(BlockJob *job)
{
    return job->deferred_to_main_loop ?
           qemu_get_aio_context() :
           blk_get_aio_context(job->blk);
}

static void block_job_attached_aio_context(AioContext *new_context,
                                           void *opaque)
{
@@ -97,6 +86,17 @@ static void block_job_attached_aio_context(AioContext *new_context,
    block_job_resume(job);
}

static void block_job_drain(BlockJob *job)
{
    /* If job is !job->busy this kicks it into the next pause point. */
    block_job_enter(job);

    blk_drain(job->blk);
    if (job->driver->drain) {
        job->driver->drain(job);
    }
}

static void block_job_detach_aio_context(void *opaque)
{
    BlockJob *job = opaque;
@@ -106,12 +106,8 @@ static void block_job_detach_aio_context(void *opaque)

    block_job_pause(job);

    if (!job->paused) {
        /* If job is !job->busy this kicks it into the next pause point. */
        block_job_enter(job);
    }
    while (!job->paused && !job->completed) {
        aio_poll(block_job_get_aio_context(job), true);
        block_job_drain(job);
    }

    block_job_unref(job);
@@ -413,14 +409,21 @@ static int block_job_finish_sync(BlockJob *job,
    assert(blk_bs(job->blk)->job == job);

    block_job_ref(job);

    finish(job, &local_err);
    if (local_err) {
        error_propagate(errp, local_err);
        block_job_unref(job);
        return -EBUSY;
    }
    /* block_job_drain calls block_job_enter, and it should be enough to
     * induce progress until the job completes or moves to the main thread.
    */
    while (!job->deferred_to_main_loop && !job->completed) {
        block_job_drain(job);
    }
    while (!job->completed) {
        aio_poll(block_job_get_aio_context(job), true);
        aio_poll(qemu_get_aio_context(), true);
    }
    ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
    block_job_unref(job);
+7 −0
Original line number Diff line number Diff line
@@ -92,6 +92,13 @@ typedef struct BlockJobDriver {
     * besides job->blk to the new AioContext.
     */
    void (*attached_aio_context)(BlockJob *job, AioContext *new_context);

    /*
     * If the callback is not NULL, it will be invoked when the job has to be
     * synchronously cancelled or completed; it should drain BlockDriverStates
     * as required to ensure progress.
     */
    void (*drain)(BlockJob *job);
} BlockJobDriver;

/**