Commit 7eaa8fb5 authored by Kevin Wolf's avatar Kevin Wolf
Browse files

job: Move transactions to Job



This moves the logic that implements job transactions from BlockJob to
Job.

Signed-off-by: default avatarKevin Wolf <kwolf@redhat.com>
Reviewed-by: default avatarMax Reitz <mreitz@redhat.com>
parent 62c9e416
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -2255,7 +2255,7 @@ void qmp_transaction(TransactionActionList *dev_list,
     */
    props = get_transaction_properties(props);
    if (props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) {
        block_job_txn = block_job_txn_new();
        block_job_txn = job_txn_new();
    }

    /* drain all i/o before any operations */
@@ -2314,7 +2314,7 @@ exit:
    if (!has_props) {
        qapi_free_TransactionProperties(props);
    }
    block_job_txn_unref(block_job_txn);
    job_txn_unref(block_job_txn);
}

void qmp_eject(bool has_device, const char *device,
@@ -3908,7 +3908,7 @@ void qmp_block_job_finalize(const char *id, Error **errp)
    }

    trace_qmp_block_job_finalize(job);
    block_job_finalize(job, errp);
    job_finalize(&job->job, errp);
    aio_context_release(aio_context);
}

+5 −233
Original line number Diff line number Diff line
@@ -36,19 +36,6 @@
#include "qemu/coroutine.h"
#include "qemu/timer.h"

/* Transactional group of block jobs */
struct JobTxn {

    /* Is this txn being cancelled? */
    bool aborting;

    /* List of jobs */
    QLIST_HEAD(, Job) jobs;

    /* Reference count */
    int refcnt;
};

/*
 * The block job API is composed of two categories of functions.
 *
@@ -94,48 +81,6 @@ BlockJob *block_job_get(const char *id)
    }
}

JobTxn *block_job_txn_new(void)
{
    JobTxn *txn = g_new0(JobTxn, 1);
    QLIST_INIT(&txn->jobs);
    txn->refcnt = 1;
    return txn;
}

static void block_job_txn_ref(JobTxn *txn)
{
    txn->refcnt++;
}

void block_job_txn_unref(JobTxn *txn)
{
    if (txn && --txn->refcnt == 0) {
        g_free(txn);
    }
}

void block_job_txn_add_job(JobTxn *txn, BlockJob *job)
{
    if (!txn) {
        return;
    }

    assert(!job->txn);
    job->txn = txn;

    QLIST_INSERT_HEAD(&txn->jobs, &job->job, txn_list);
    block_job_txn_ref(txn);
}

void block_job_txn_del_job(BlockJob *job)
{
    if (job->txn) {
        QLIST_REMOVE(&job->job, txn_list);
        block_job_txn_unref(job->txn);
        job->txn = NULL;
    }
}

static void block_job_attached_aio_context(AioContext *new_context,
                                           void *opaque);
static void block_job_detach_aio_context(void *opaque);
@@ -145,8 +90,6 @@ void block_job_free(Job *job)
    BlockJob *bjob = container_of(job, BlockJob, job);
    BlockDriverState *bs = blk_bs(bjob->blk);

    assert(!bjob->txn);

    bs->job = NULL;
    block_job_remove_all_bdrv(bjob);
    blk_remove_aio_context_notifier(bjob->blk,
@@ -261,158 +204,6 @@ const BlockJobDriver *block_job_driver(BlockJob *job)
    return job->driver;
}

static int block_job_prepare(BlockJob *job)
{
    if (job->job.ret == 0 && job->driver->prepare) {
        job->job.ret = job->driver->prepare(job);
    }
    return job->job.ret;
}

static void job_cancel_async(Job *job, bool force)
{
    if (job->user_paused) {
        /* Do not call job_enter here, the caller will handle it.  */
        job->user_paused = false;
        if (job->driver->user_resume) {
            job->driver->user_resume(job);
        }
        assert(job->pause_count > 0);
        job->pause_count--;
    }
    job->cancelled = true;
    /* To prevent 'force == false' overriding a previous 'force == true' */
    job->force_cancel |= force;
}

static int block_job_txn_apply(JobTxn *txn, int fn(BlockJob *), bool lock)
{
    AioContext *ctx;
    Job *job, *next;
    BlockJob *bjob;
    int rc = 0;

    QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
        assert(is_block_job(job));
        bjob = container_of(job, BlockJob, job);

        if (lock) {
            ctx = job->aio_context;
            aio_context_acquire(ctx);
        }
        rc = fn(bjob);
        if (lock) {
            aio_context_release(ctx);
        }
        if (rc) {
            break;
        }
    }
    return rc;
}

static void block_job_completed_txn_abort(BlockJob *job)
{
    AioContext *ctx;
    JobTxn *txn = job->txn;
    Job *other_job;

    if (txn->aborting) {
        /*
         * We are cancelled by another job, which will handle everything.
         */
        return;
    }
    txn->aborting = true;
    block_job_txn_ref(txn);

    /* We are the first failed job. Cancel other jobs. */
    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
        ctx = other_job->aio_context;
        aio_context_acquire(ctx);
    }

    /* Other jobs are effectively cancelled by us, set the status for
     * them; this job, however, may or may not be cancelled, depending
     * on the caller, so leave it. */
    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
        if (other_job != &job->job) {
            job_cancel_async(other_job, false);
        }
    }
    while (!QLIST_EMPTY(&txn->jobs)) {
        other_job = QLIST_FIRST(&txn->jobs);
        ctx = other_job->aio_context;
        if (!job_is_completed(other_job)) {
            assert(job_is_cancelled(other_job));
            job_finish_sync(other_job, NULL, NULL);
        }
        job_finalize_single(other_job);
        aio_context_release(ctx);
    }

    block_job_txn_unref(txn);
}

static int block_job_needs_finalize(BlockJob *job)
{
    return !job->job.auto_finalize;
}

static int block_job_finalize_single(BlockJob *job)
{
    return job_finalize_single(&job->job);
}

static void block_job_do_finalize(BlockJob *job)
{
    int rc;
    assert(job && job->txn);

    /* prepare the transaction to complete */
    rc = block_job_txn_apply(job->txn, block_job_prepare, true);
    if (rc) {
        block_job_completed_txn_abort(job);
    } else {
        block_job_txn_apply(job->txn, block_job_finalize_single, true);
    }
}

static int block_job_transition_to_pending(BlockJob *job)
{
    job_state_transition(&job->job, JOB_STATUS_PENDING);
    if (!job->job.auto_finalize) {
        job_event_pending(&job->job);
    }
    return 0;
}

static void block_job_completed_txn_success(BlockJob *job)
{
    JobTxn *txn = job->txn;
    Job *other_job;

    job_state_transition(&job->job, JOB_STATUS_WAITING);

    /*
     * Successful completion, see if there are other running jobs in this
     * txn.
     */
    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
        if (!job_is_completed(other_job)) {
            return;
        }
        assert(other_job->ret == 0);
    }

    block_job_txn_apply(txn, block_job_transition_to_pending, false);

    /* If no jobs need manual finalization, automatically do so */
    if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
        block_job_do_finalize(job);
    }
}

/* Assumes the job_mutex is held */
static bool job_timer_pending(Job *job)
{
@@ -451,15 +242,6 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
    return ratelimit_calculate_delay(&job->limit, n);
}

void block_job_finalize(BlockJob *job, Error **errp)
{
    assert(job && job->job.id);
    if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
        return;
    }
    block_job_do_finalize(job);
}

void block_job_dismiss(BlockJob **jobptr, Error **errp)
{
    BlockJob *job = *jobptr;
@@ -483,7 +265,7 @@ void block_job_cancel(BlockJob *job, bool force)
    if (!job_started(&job->job)) {
        block_job_completed(job, -ECANCELED);
    } else if (job->job.deferred_to_main_loop) {
        block_job_completed_txn_abort(job);
        job_completed_txn_abort(&job->job);
    } else {
        block_job_enter(job);
    }
@@ -656,7 +438,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
        return NULL;
    }

    job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk),
    job = job_create(job_id, &driver->job_driver, txn, blk_get_aio_context(blk),
                     flags, cb, opaque, errp);
    if (job == NULL) {
        blk_unref(blk);
@@ -703,30 +485,20 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
        }
    }

    /* Single jobs are modeled as single-job transactions for sake of
     * consolidating the job management logic */
    if (!txn) {
        txn = block_job_txn_new();
        block_job_txn_add_job(txn, job);
        block_job_txn_unref(txn);
    } else {
        block_job_txn_add_job(txn, job);
    }

    return job;
}

void block_job_completed(BlockJob *job, int ret)
{
    assert(job && job->txn && !job_is_completed(&job->job));
    assert(job && job->job.txn && !job_is_completed(&job->job));
    assert(blk_bs(job->blk)->job == job);
    job->job.ret = ret;
    job_update_rc(&job->job);
    trace_block_job_completed(job, ret, job->job.ret);
    if (job->job.ret) {
        block_job_completed_txn_abort(job);
        job_completed_txn_abort(&job->job);
    } else {
        block_job_completed_txn_success(job);
        job_completed_txn_success(&job->job);
    }
}

+0 −54
Original line number Diff line number Diff line
@@ -33,7 +33,6 @@
#define BLOCK_JOB_SLICE_TIME 100000000ULL /* ns */

typedef struct BlockJobDriver BlockJobDriver;
typedef struct JobTxn JobTxn;

/**
 * BlockJob:
@@ -84,8 +83,6 @@ typedef struct BlockJob {

    /** BlockDriverStates that are involved in this block job */
    GSList *nodes;

    JobTxn *txn;
} BlockJob;

/**
@@ -152,22 +149,6 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
 */
void block_job_cancel(BlockJob *job, bool force);

/**
 * block_job_finalize:
 * @job: The job to fully commit and finish.
 * @errp: Error object.
 *
 * For jobs that have finished their work and are pending
 * awaiting explicit acknowledgement to commit their work,
 * This will commit that work.
 *
 * FIXME: Make the below statement universally true:
 * For jobs that support the manual workflow mode, all graph
 * changes that occur as a result will occur after this command
 * and before a successful reply.
 */
void block_job_finalize(BlockJob *job, Error **errp);

/**
 * block_job_dismiss:
 * @job: The job to be dismissed.
@@ -259,41 +240,6 @@ int block_job_complete_sync(BlockJob *job, Error **errp);
 */
void block_job_iostatus_reset(BlockJob *job);

/**
 * block_job_txn_new:
 *
 * Allocate and return a new block job transaction.  Jobs can be added to the
 * transaction using block_job_txn_add_job().
 *
 * The transaction is automatically freed when the last job completes or is
 * cancelled.
 *
 * All jobs in the transaction either complete successfully or fail/cancel as a
 * group.  Jobs wait for each other before completing.  Cancelling one job
 * cancels all jobs in the transaction.
 */
JobTxn *block_job_txn_new(void);

/**
 * block_job_txn_unref:
 *
 * Release a reference that was previously acquired with block_job_txn_add_job
 * or block_job_txn_new. If it's the last reference to the object, it will be
 * freed.
 */
void block_job_txn_unref(JobTxn *txn);

/**
 * block_job_txn_add_job:
 * @txn: The transaction (may be NULL)
 * @job: Job to add to the transaction
 *
 * Add @job to the transaction.  The @job must not already be in a transaction.
 * The caller must call either block_job_txn_unref() or block_job_completed()
 * to release the reference that is automatically grabbed here.
 */
void block_job_txn_add_job(JobTxn *txn, BlockJob *job);

/**
 * block_job_is_internal:
 * @job: The job to determine if it is user-visible or not.
+0 −10
Original line number Diff line number Diff line
@@ -38,16 +38,6 @@ struct BlockJobDriver {
    /** Generic JobDriver callbacks and settings */
    JobDriver job_driver;

    /**
     * If the callback is not NULL, prepare will be invoked when all the jobs
     * belonging to the same transaction complete; or upon this job's completion
     * if it is not in a transaction.
     *
     * This callback will not be invoked if the job has already failed.
     * If it fails, abort and then clean will be called.
     */
    int (*prepare)(BlockJob *job);

    /*
     * If the callback is not NULL, it will be invoked before the job is
     * resumed in a new AioContext.  This is the place to move any resources
+62 −9
Original line number Diff line number Diff line
@@ -32,6 +32,8 @@
#include "block/aio.h"

typedef struct JobDriver JobDriver;
typedef struct JobTxn JobTxn;


/**
 * Long-running operation.
@@ -133,6 +135,9 @@ typedef struct Job {
    /** Element of the list of jobs */
    QLIST_ENTRY(Job) job_list;

    /** Transaction this job is part of */
    JobTxn *txn;

    /** Element of the list of jobs in a job transaction */
    QLIST_ENTRY(Job) txn_list;
} Job;
@@ -183,6 +188,16 @@ struct JobDriver {
     */
    void (*drain)(Job *job);

    /**
     * If the callback is not NULL, prepare will be invoked when all the jobs
     * belonging to the same transaction complete; or upon this job's completion
     * if it is not in a transaction.
     *
     * This callback will not be invoked if the job has already failed.
     * If it fails, abort and then clean will be called.
     */
    int (*prepare)(Job *job);

    /**
     * If the callback is not NULL, it will be invoked when all the jobs
     * belonging to the same transaction complete; or upon this job's
@@ -227,20 +242,52 @@ typedef enum JobCreateFlags {
    JOB_MANUAL_DISMISS = 0x04,
} JobCreateFlags;

/**
 * Allocate and return a new job transaction. Jobs can be added to the
 * transaction using job_txn_add_job().
 *
 * The transaction is automatically freed when the last job completes or is
 * cancelled.
 *
 * All jobs in the transaction either complete successfully or fail/cancel as a
 * group.  Jobs wait for each other before completing.  Cancelling one job
 * cancels all jobs in the transaction.
 */
JobTxn *job_txn_new(void);

/**
 * Release a reference that was previously acquired with job_txn_add_job or
 * job_txn_new. If it's the last reference to the object, it will be freed.
 */
void job_txn_unref(JobTxn *txn);

/**
 * @txn: The transaction (may be NULL)
 * @job: Job to add to the transaction
 *
 * Add @job to the transaction.  The @job must not already be in a transaction.
 * The caller must call either job_txn_unref() or block_job_completed() to
 * release the reference that is automatically grabbed here.
 *
 * If @txn is NULL, the function does nothing.
 */
void job_txn_add_job(JobTxn *txn, Job *job);

/**
 * Create a new long-running job and return it.
 *
 * @job_id: The id of the newly-created job, or %NULL for internal jobs
 * @driver: The class object for the newly-created job.
 * @txn: The transaction this job belongs to, if any. %NULL otherwise.
 * @ctx: The AioContext to run the job coroutine in.
 * @flags: Creation flags for the job. See @JobCreateFlags.
 * @cb: Completion function for the job.
 * @opaque: Opaque pointer value passed to @cb.
 * @errp: Error object.
 */
void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
                 int flags, BlockCompletionFunc *cb, void *opaque, Error **errp);
void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
                 void *opaque, Error **errp);

/**
 * Add a reference to Job refcnt, it will be decreased with job_unref, and then
@@ -260,9 +307,6 @@ void job_event_cancelled(Job *job);
/** To be called when a successfully completed job is finalised. */
void job_event_completed(Job *job);

/** To be called when the job transitions to PENDING */
void job_event_pending(Job *job);

/**
 * Conditionally enter the job coroutine if the job is ready to run, not
 * already busy and fn() returns true. fn() is called while under the job_lock
@@ -375,6 +419,16 @@ void job_early_fail(Job *job);
/** Asynchronously complete the specified @job. */
void job_complete(Job *job, Error **errp);;

/**
 * For a @job that has finished its work and is pending awaiting explicit
 * acknowledgement to commit its work, this will commit that work.
 *
 * FIXME: Make the below statement universally true:
 * For jobs that support the manual workflow mode, all graph changes that occur
 * as a result will occur after this command and before a successful reply.
 */
void job_finalize(Job *job, Error **errp);

typedef void JobDeferToMainLoopFn(Job *job, void *opaque);

/**
@@ -407,10 +461,9 @@ void coroutine_fn job_do_yield(Job *job, uint64_t ns);
bool job_should_pause(Job *job);
bool job_started(Job *job);
void job_do_dismiss(Job *job);
int job_finalize_single(Job *job);
void job_update_rc(Job *job);

typedef struct BlockJob BlockJob;
void block_job_txn_del_job(BlockJob *job);
void job_cancel_async(Job *job, bool force);
void job_completed_txn_abort(Job *job);
void job_completed_txn_success(Job *job);

#endif
Loading