Commit 1ed9bd12 authored by Stefan Hajnoczi's avatar Stefan Hajnoczi
Browse files

Merge remote-tracking branch 'jtc/tags/block-pull-request' into staging



# gpg: Signature made Tue 15 Nov 2016 04:10:29 AM GMT
# gpg:                using RSA key 0xBDBE7B27C0DE3057
# gpg: Good signature from "Jeffrey Cody <jcody@redhat.com>"
# gpg:                 aka "Jeffrey Cody <jeff@codyprime.org>"
# gpg:                 aka "Jeffrey Cody <codyprime@gmail.com>"
# Primary key fingerprint: 9957 4B4D 3474 90E7 9D98  D624 BDBE 7B27 C0DE 3057

* jtc/tags/block-pull-request:
  mirror: do not flush every time the disks are synced
  block/curl: Do not wait for data beyond EOF
  block/curl: Remember all sockets
  block/curl: Fix return value from curl_read_cb
  block/curl: Use BDRV_SECTOR_SIZE
  block/curl: Drop TFTP "support"
  qemu-iotests: avoid spurious failure on test 109
  iotests: add transactional failure race test
  blockjob: refactor backup_start as backup_job_create
  blockjob: add block_job_start
  blockjob: add .start field
  blockjob: add .clean property
  blockjob: fix dead pointer in txn list

Message-id: 1479183291-14086-1-git-send-email-jcody@redhat.com
Signed-off-by: default avatarStefan Hajnoczi <stefanha@redhat.com>
parents 8a7b5c18 bdffb31d
Loading
Loading
Loading
Loading
+35 −28
Original line number Diff line number Diff line
@@ -242,6 +242,14 @@ static void backup_abort(BlockJob *job)
    }
}

static void backup_clean(BlockJob *job)
{
    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
    assert(s->target);
    blk_unref(s->target);
    s->target = NULL;
}

static void backup_attached_aio_context(BlockJob *job, AioContext *aio_context)
{
    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
@@ -315,16 +323,6 @@ static void backup_drain(BlockJob *job)
    }
}

static const BlockJobDriver backup_job_driver = {
    .instance_size          = sizeof(BackupBlockJob),
    .job_type               = BLOCK_JOB_TYPE_BACKUP,
    .set_speed              = backup_set_speed,
    .commit                 = backup_commit,
    .abort                  = backup_abort,
    .attached_aio_context   = backup_attached_aio_context,
    .drain                  = backup_drain,
};

static BlockErrorAction backup_error_action(BackupBlockJob *job,
                                            bool read, int error)
{
@@ -343,12 +341,8 @@ typedef struct {

static void backup_complete(BlockJob *job, void *opaque)
{
    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
    BackupCompleteData *data = opaque;

    blk_unref(s->target);
    s->target = NULL;

    block_job_completed(job, data->ret);
    g_free(data);
}
@@ -537,7 +531,19 @@ static void coroutine_fn backup_run(void *opaque)
    block_job_defer_to_main_loop(&job->common, backup_complete, data);
}

void backup_start(const char *job_id, BlockDriverState *bs,
static const BlockJobDriver backup_job_driver = {
    .instance_size          = sizeof(BackupBlockJob),
    .job_type               = BLOCK_JOB_TYPE_BACKUP,
    .start                  = backup_run,
    .set_speed              = backup_set_speed,
    .commit                 = backup_commit,
    .abort                  = backup_abort,
    .clean                  = backup_clean,
    .attached_aio_context   = backup_attached_aio_context,
    .drain                  = backup_drain,
};

BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs,
                  BlockDriverState *target, int64_t speed,
                  MirrorSyncMode sync_mode, BdrvDirtyBitmap *sync_bitmap,
                  bool compress,
@@ -557,52 +563,52 @@ void backup_start(const char *job_id, BlockDriverState *bs,

    if (bs == target) {
        error_setg(errp, "Source and target cannot be the same");
        return;
        return NULL;
    }

    if (!bdrv_is_inserted(bs)) {
        error_setg(errp, "Device is not inserted: %s",
                   bdrv_get_device_name(bs));
        return;
        return NULL;
    }

    if (!bdrv_is_inserted(target)) {
        error_setg(errp, "Device is not inserted: %s",
                   bdrv_get_device_name(target));
        return;
        return NULL;
    }

    if (compress && target->drv->bdrv_co_pwritev_compressed == NULL) {
        error_setg(errp, "Compression is not supported for this drive %s",
                   bdrv_get_device_name(target));
        return;
        return NULL;
    }

    if (bdrv_op_is_blocked(bs, BLOCK_OP_TYPE_BACKUP_SOURCE, errp)) {
        return;
        return NULL;
    }

    if (bdrv_op_is_blocked(target, BLOCK_OP_TYPE_BACKUP_TARGET, errp)) {
        return;
        return NULL;
    }

    if (sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
        if (!sync_bitmap) {
            error_setg(errp, "must provide a valid bitmap name for "
                             "\"incremental\" sync mode");
            return;
            return NULL;
        }

        /* Create a new bitmap, and freeze/disable this one. */
        if (bdrv_dirty_bitmap_create_successor(bs, sync_bitmap, errp) < 0) {
            return;
            return NULL;
        }
    } else if (sync_bitmap) {
        error_setg(errp,
                   "a sync_bitmap was provided to backup_run, "
                   "but received an incompatible sync_mode (%s)",
                   MirrorSyncMode_lookup[sync_mode]);
        return;
        return NULL;
    }

    len = bdrv_getlength(bs);
@@ -648,17 +654,18 @@ void backup_start(const char *job_id, BlockDriverState *bs,

    block_job_add_bdrv(&job->common, target);
    job->common.len = len;
    job->common.co = qemu_coroutine_create(backup_run, job);
    block_job_txn_add_job(txn, &job->common);
    qemu_coroutine_enter(job->common.co);
    return;

    return &job->common;

 error:
    if (sync_bitmap) {
        bdrv_reclaim_dirty_bitmap(bs, sync_bitmap, NULL);
    }
    if (job) {
        blk_unref(job->target);
        backup_clean(&job->common);
        block_job_unref(&job->common);
    }

    return NULL;
}
+3 −3
Original line number Diff line number Diff line
@@ -205,6 +205,7 @@ static const BlockJobDriver commit_job_driver = {
    .instance_size = sizeof(CommitBlockJob),
    .job_type      = BLOCK_JOB_TYPE_COMMIT,
    .set_speed     = commit_set_speed,
    .start         = commit_run,
};

void commit_start(const char *job_id, BlockDriverState *bs,
@@ -288,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
    s->backing_file_str = g_strdup(backing_file_str);

    s->on_error = on_error;
    s->common.co = qemu_coroutine_create(commit_run, s);

    trace_commit_start(bs, base, top, s, s->common.co);
    qemu_coroutine_enter(s->common.co);
    trace_commit_start(bs, base, top, s);
    block_job_start(&s->common);
}


+76 −43
Original line number Diff line number Diff line
@@ -68,12 +68,10 @@ static CURLMcode __curl_multi_socket_action(CURLM *multi_handle,
#endif

#define PROTOCOLS (CURLPROTO_HTTP | CURLPROTO_HTTPS | \
                   CURLPROTO_FTP | CURLPROTO_FTPS | \
                   CURLPROTO_TFTP)
                   CURLPROTO_FTP | CURLPROTO_FTPS)

#define CURL_NUM_STATES 8
#define CURL_NUM_ACB    8
#define SECTOR_SIZE     512
#define READ_AHEAD_DEFAULT (256 * 1024)
#define CURL_TIMEOUT_DEFAULT 5
#define CURL_TIMEOUT_MAX 10000
@@ -105,12 +103,17 @@ typedef struct CURLAIOCB {
    size_t end;
} CURLAIOCB;

typedef struct CURLSocket {
    int fd;
    QLIST_ENTRY(CURLSocket) next;
} CURLSocket;

typedef struct CURLState
{
    struct BDRVCURLState *s;
    CURLAIOCB *acb[CURL_NUM_ACB];
    CURL *curl;
    curl_socket_t sock_fd;
    QLIST_HEAD(, CURLSocket) sockets;
    char *orig_buf;
    size_t buf_start;
    size_t buf_off;
@@ -164,10 +167,27 @@ static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action,
{
    BDRVCURLState *s;
    CURLState *state = NULL;
    CURLSocket *socket;

    curl_easy_getinfo(curl, CURLINFO_PRIVATE, (char **)&state);
    state->sock_fd = fd;
    s = state->s;

    QLIST_FOREACH(socket, &state->sockets, next) {
        if (socket->fd == fd) {
            if (action == CURL_POLL_REMOVE) {
                QLIST_REMOVE(socket, next);
                g_free(socket);
            }
            break;
        }
    }
    if (!socket) {
        socket = g_new0(CURLSocket, 1);
        socket->fd = fd;
        QLIST_INSERT_HEAD(&state->sockets, socket, next);
    }
    socket = NULL;

    DPRINTF("CURL (AIO): Sock action %d on fd %d\n", action, (int)fd);
    switch (action) {
        case CURL_POLL_IN:
@@ -213,12 +233,13 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)

    DPRINTF("CURL: Just reading %zd bytes\n", realsize);

    if (!s || !s->orig_buf)
        return 0;
    if (!s || !s->orig_buf) {
        goto read_end;
    }

    if (s->buf_off >= s->buf_len) {
        /* buffer full, read nothing */
        return 0;
        goto read_end;
    }
    realsize = MIN(realsize, s->buf_len - s->buf_off);
    memcpy(s->orig_buf + s->buf_off, ptr, realsize);
@@ -231,15 +252,26 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
            continue;

        if ((s->buf_off >= acb->end)) {
            size_t request_length = acb->nb_sectors * BDRV_SECTOR_SIZE;

            qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start,
                                acb->end - acb->start);

            if (acb->end - acb->start < request_length) {
                size_t offset = acb->end - acb->start;
                qemu_iovec_memset(acb->qiov, offset, 0,
                                  request_length - offset);
            }

            acb->common.cb(acb->common.opaque, 0);
            qemu_aio_unref(acb);
            s->acb[i] = NULL;
        }
    }

    return realsize;
read_end:
    /* curl will error out if we do not return this value */
    return size * nmemb;
}

static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len,
@@ -247,6 +279,8 @@ static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len,
{
    int i;
    size_t end = start + len;
    size_t clamped_end = MIN(end, s->len);
    size_t clamped_len = clamped_end - start;

    for (i=0; i<CURL_NUM_STATES; i++) {
        CURLState *state = &s->states[i];
@@ -261,12 +295,15 @@ static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len,
        // Does the existing buffer cover our section?
        if ((start >= state->buf_start) &&
            (start <= buf_end) &&
            (end >= state->buf_start) &&
            (end <= buf_end))
            (clamped_end >= state->buf_start) &&
            (clamped_end <= buf_end))
        {
            char *buf = state->orig_buf + (start - state->buf_start);

            qemu_iovec_from_buf(acb->qiov, 0, buf, len);
            qemu_iovec_from_buf(acb->qiov, 0, buf, clamped_len);
            if (clamped_len < len) {
                qemu_iovec_memset(acb->qiov, clamped_len, 0, len - clamped_len);
            }
            acb->common.cb(acb->common.opaque, 0);

            return FIND_RET_OK;
@@ -276,13 +313,13 @@ static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len,
        if (state->in_use &&
            (start >= state->buf_start) &&
            (start <= buf_fend) &&
            (end >= state->buf_start) &&
            (end <= buf_fend))
            (clamped_end >= state->buf_start) &&
            (clamped_end <= buf_fend))
        {
            int j;

            acb->start = start - state->buf_start;
            acb->end = acb->start + len;
            acb->end = acb->start + clamped_len;

            for (j=0; j<CURL_NUM_ACB; j++) {
                if (!state->acb[j]) {
@@ -352,6 +389,7 @@ static void curl_multi_check_completion(BDRVCURLState *s)
static void curl_multi_do(void *arg)
{
    CURLState *s = (CURLState *)arg;
    CURLSocket *socket, *next_socket;
    int running;
    int r;

@@ -359,10 +397,13 @@ static void curl_multi_do(void *arg)
        return;
    }

    /* Need to use _SAFE because curl_multi_socket_action() may trigger
     * curl_sock_cb() which might modify this list */
    QLIST_FOREACH_SAFE(socket, &s->sockets, next, next_socket) {
        do {
        r = curl_multi_socket_action(s->s->multi, s->sock_fd, 0, &running);
            r = curl_multi_socket_action(s->s->multi, socket->fd, 0, &running);
        } while (r == CURLM_CALL_MULTI_PERFORM);

    }
}

static void curl_multi_read(void *arg)
@@ -466,6 +507,7 @@ static CURLState *curl_init_state(BlockDriverState *bs, BDRVCURLState *s)
#endif
    }

    QLIST_INIT(&state->sockets);
    state->s = s;

    return state;
@@ -475,6 +517,14 @@ static void curl_clean_state(CURLState *s)
{
    if (s->s->multi)
        curl_multi_remove_handle(s->s->multi, s->curl);

    while (!QLIST_EMPTY(&s->sockets)) {
        CURLSocket *socket = QLIST_FIRST(&s->sockets);

        QLIST_REMOVE(socket, next);
        g_free(socket);
    }

    s->in_use = 0;
}

@@ -738,12 +788,12 @@ static void curl_readv_bh_cb(void *p)
    CURLAIOCB *acb = p;
    BDRVCURLState *s = acb->common.bs->opaque;

    size_t start = acb->sector_num * SECTOR_SIZE;
    size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
    size_t end;

    // In case we have the requested data already (e.g. read-ahead),
    // we can just call the callback and be done.
    switch (curl_find_buf(s, start, acb->nb_sectors * SECTOR_SIZE, acb)) {
    switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
        case FIND_RET_OK:
            qemu_aio_unref(acb);
            // fall through
@@ -762,13 +812,13 @@ static void curl_readv_bh_cb(void *p)
    }

    acb->start = 0;
    acb->end = (acb->nb_sectors * SECTOR_SIZE);
    acb->end = MIN(acb->nb_sectors * BDRV_SECTOR_SIZE, s->len - start);

    state->buf_off = 0;
    g_free(state->orig_buf);
    state->buf_start = start;
    state->buf_len = acb->end + s->readahead_size;
    end = MIN(start + state->buf_len, s->len) - 1;
    state->buf_len = MIN(acb->end + s->readahead_size, s->len - start);
    end = start + state->buf_len - 1;
    state->orig_buf = g_try_malloc(state->buf_len);
    if (state->buf_len && state->orig_buf == NULL) {
        curl_clean_state(state);
@@ -779,8 +829,8 @@ static void curl_readv_bh_cb(void *p)
    state->acb[0] = acb;

    snprintf(state->range, 127, "%zd-%zd", start, end);
    DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n",
            (acb->nb_sectors * SECTOR_SIZE), start, state->range);
    DPRINTF("CURL (AIO): Reading %llu at %zd (%s)\n",
            (acb->nb_sectors * BDRV_SECTOR_SIZE), start, state->range);
    curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);

    curl_multi_add_handle(s->multi, state->curl);
@@ -886,29 +936,12 @@ static BlockDriver bdrv_ftps = {
    .bdrv_attach_aio_context    = curl_attach_aio_context,
};

static BlockDriver bdrv_tftp = {
    .format_name                = "tftp",
    .protocol_name              = "tftp",

    .instance_size              = sizeof(BDRVCURLState),
    .bdrv_parse_filename        = curl_parse_filename,
    .bdrv_file_open             = curl_open,
    .bdrv_close                 = curl_close,
    .bdrv_getlength             = curl_getlength,

    .bdrv_aio_readv             = curl_aio_readv,

    .bdrv_detach_aio_context    = curl_detach_aio_context,
    .bdrv_attach_aio_context    = curl_attach_aio_context,
};

static void curl_block_init(void)
{
    bdrv_register(&bdrv_http);
    bdrv_register(&bdrv_https);
    bdrv_register(&bdrv_ftp);
    bdrv_register(&bdrv_ftps);
    bdrv_register(&bdrv_tftp);
}

block_init(curl_block_init);
+29 −18
Original line number Diff line number Diff line
@@ -615,6 +615,20 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
    return 0;
}

/* Called when going out of the streaming phase to flush the bulk of the
 * data to the medium, or just before completing.
 */
static int mirror_flush(MirrorBlockJob *s)
{
    int ret = blk_flush(s->target);
    if (ret < 0) {
        if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
            s->ret = ret;
        }
    }
    return ret;
}

static void coroutine_fn mirror_run(void *opaque)
{
    MirrorBlockJob *s = opaque;
@@ -727,19 +741,16 @@ static void coroutine_fn mirror_run(void *opaque)
        should_complete = false;
        if (s->in_flight == 0 && cnt == 0) {
            trace_mirror_before_flush(s);
            ret = blk_flush(s->target);
            if (ret < 0) {
                if (mirror_error_action(s, false, -ret) ==
                    BLOCK_ERROR_ACTION_REPORT) {
                    goto immediate_exit;
            if (!s->synced) {
                if (mirror_flush(s) < 0) {
                    /* Go check s->ret.  */
                    continue;
                }
            } else {
                /* We're out of the streaming phase.  From now on, if the job
                 * is cancelled we will actually complete all pending I/O and
                 * report completion.  This way, block-job-cancel will leave
                 * the target in a consistent state.
                 */
                if (!s->synced) {
                block_job_event_ready(&s->common);
                s->synced = true;
            }
@@ -748,7 +759,6 @@ static void coroutine_fn mirror_run(void *opaque)
                block_job_is_cancelled(&s->common);
            cnt = bdrv_get_dirty_count(s->dirty_bitmap);
        }
        }

        if (cnt == 0 && should_complete) {
            /* The dirty bitmap is not updated while operations are pending.
@@ -765,7 +775,7 @@ static void coroutine_fn mirror_run(void *opaque)

            bdrv_drained_begin(bs);
            cnt = bdrv_get_dirty_count(s->dirty_bitmap);
            if (cnt > 0) {
            if (cnt > 0 || mirror_flush(s) < 0) {
                bdrv_drained_end(bs);
                continue;
            }
@@ -920,6 +930,7 @@ static const BlockJobDriver mirror_job_driver = {
    .instance_size          = sizeof(MirrorBlockJob),
    .job_type               = BLOCK_JOB_TYPE_MIRROR,
    .set_speed              = mirror_set_speed,
    .start                  = mirror_run,
    .complete               = mirror_complete,
    .pause                  = mirror_pause,
    .attached_aio_context   = mirror_attached_aio_context,
@@ -930,6 +941,7 @@ static const BlockJobDriver commit_active_job_driver = {
    .instance_size          = sizeof(MirrorBlockJob),
    .job_type               = BLOCK_JOB_TYPE_COMMIT,
    .set_speed              = mirror_set_speed,
    .start                  = mirror_run,
    .complete               = mirror_complete,
    .pause                  = mirror_pause,
    .attached_aio_context   = mirror_attached_aio_context,
@@ -1007,9 +1019,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
        }
    }

    s->common.co = qemu_coroutine_create(mirror_run, s);
    trace_mirror_start(bs, s, s->common.co, opaque);
    qemu_coroutine_enter(s->common.co);
    trace_mirror_start(bs, s, opaque);
    block_job_start(&s->common);
}

void mirror_start(const char *job_id, BlockDriverState *bs,
+7 −5
Original line number Diff line number Diff line
@@ -421,6 +421,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
    int64_t active_length, hidden_length, disk_length;
    AioContext *aio_context;
    Error *local_err = NULL;
    BlockJob *job;

    aio_context = bdrv_get_aio_context(bs);
    aio_context_acquire(aio_context);
@@ -508,17 +509,18 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
        bdrv_op_block_all(top_bs, s->blocker);
        bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);

        backup_start(NULL, s->secondary_disk->bs, s->hidden_disk->bs, 0,
                     MIRROR_SYNC_MODE_NONE, NULL, false,
                     BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
                     BLOCK_JOB_INTERNAL, backup_job_completed, bs,
                     NULL, &local_err);
        job = backup_job_create(NULL, s->secondary_disk->bs, s->hidden_disk->bs,
                                0, MIRROR_SYNC_MODE_NONE, NULL, false,
                                BLOCKDEV_ON_ERROR_REPORT,
                                BLOCKDEV_ON_ERROR_REPORT, BLOCK_JOB_INTERNAL,
                                backup_job_completed, bs, NULL, &local_err);
        if (local_err) {
            error_propagate(errp, local_err);
            backup_job_cleanup(bs);
            aio_context_release(aio_context);
            return;
        }
        block_job_start(job);
        break;
    default:
        aio_context_release(aio_context);
Loading