Commit 6df264ac authored by Juan Quintela's avatar Juan Quintela
Browse files

migration: Synchronize multifd threads with main thread



We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
synchronizations don't happen inside a  ram section, so we are safe
about two channels trying to overwrite the same memory.

Signed-off-by: default avatarJuan Quintela <quintela@redhat.com>
Reviewed-by: default avatarDr. David Alan Gilbert <dgilbert@redhat.com>

--
seq needs to be atomic now, will also be accessed from main thread.
Fix the if (true || ...) leftover
We are back to non-atomics
parent 0beb5ed3
Loading
Loading
Loading
Loading
+115 −30
Original line number Diff line number Diff line
@@ -510,6 +510,8 @@ exit:
#define MULTIFD_MAGIC 0x11223344U
#define MULTIFD_VERSION 1

#define MULTIFD_FLAG_SYNC (1 << 0)

typedef struct {
    uint32_t magic;
    uint32_t version;
@@ -577,6 +579,8 @@ typedef struct {
    uint64_t num_packets;
    /* pages sent through this channel */
    uint64_t num_pages;
    /* syncs main thread and channels */
    QemuSemaphore sem_sync;
}  MultiFDSendParams;

typedef struct {
@@ -614,6 +618,8 @@ typedef struct {
    uint64_t num_packets;
    /* pages sent through this channel */
    uint64_t num_pages;
    /* syncs main thread and channels */
    QemuSemaphore sem_sync;
} MultiFDRecvParams;

static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -801,6 +807,10 @@ struct {
    int count;
    /* array of pages to sent */
    MultiFDPages_t *pages;
    /* syncs main thread and channels */
    QemuSemaphore sem_sync;
    /* global number of generated multifd packets */
    uint64_t packet_num;
} *multifd_send_state;

static void multifd_send_terminate_threads(Error *err)
@@ -848,6 +858,7 @@ int multifd_save_cleanup(Error **errp)
        p->c = NULL;
        qemu_mutex_destroy(&p->mutex);
        qemu_sem_destroy(&p->sem);
        qemu_sem_destroy(&p->sem_sync);
        g_free(p->name);
        p->name = NULL;
        multifd_pages_clear(p->pages);
@@ -856,6 +867,7 @@ int multifd_save_cleanup(Error **errp)
        g_free(p->packet);
        p->packet = NULL;
    }
    qemu_sem_destroy(&multifd_send_state->sem_sync);
    g_free(multifd_send_state->params);
    multifd_send_state->params = NULL;
    multifd_pages_clear(multifd_send_state->pages);
@@ -865,6 +877,33 @@ int multifd_save_cleanup(Error **errp)
    return ret;
}

static void multifd_send_sync_main(void)
{
    int i;

    if (!migrate_use_multifd()) {
        return;
    }
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];

        trace_multifd_send_sync_main_signal(p->id);

        qemu_mutex_lock(&p->mutex);
        p->flags |= MULTIFD_FLAG_SYNC;
        p->pending_job++;
        qemu_mutex_unlock(&p->mutex);
        qemu_sem_post(&p->sem);
    }
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];

        trace_multifd_send_sync_main_wait(p->id);
        qemu_sem_wait(&multifd_send_state->sem_sync);
    }
    trace_multifd_send_sync_main(multifd_send_state->packet_num);
}

static void *multifd_send_thread(void *opaque)
{
    MultiFDSendParams *p = opaque;
@@ -901,15 +940,17 @@ static void *multifd_send_thread(void *opaque)
            qemu_mutex_lock(&p->mutex);
            p->pending_job--;
            qemu_mutex_unlock(&p->mutex);
            continue;

            if (flags & MULTIFD_FLAG_SYNC) {
                qemu_sem_post(&multifd_send_state->sem_sync);
            }
        } else if (p->quit) {
            qemu_mutex_unlock(&p->mutex);
            break;
        }
        } else {
            qemu_mutex_unlock(&p->mutex);
        /* this is impossible */
        error_setg(&local_err, "multifd_send_thread: Unknown command");
        break;
            /* sometimes there are spurious wakeups */
        }
    }

out:
@@ -961,12 +1002,14 @@ int multifd_save_setup(void)
    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
    atomic_set(&multifd_send_state->count, 0);
    multifd_send_state->pages = multifd_pages_init(page_count);
    qemu_sem_init(&multifd_send_state->sem_sync, 0);

    for (i = 0; i < thread_count; i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];

        qemu_mutex_init(&p->mutex);
        qemu_sem_init(&p->sem, 0);
        qemu_sem_init(&p->sem_sync, 0);
        p->quit = false;
        p->pending_job = 0;
        p->id = i;
@@ -984,6 +1027,10 @@ struct {
    MultiFDRecvParams *params;
    /* number of created threads */
    int count;
    /* syncs main thread and channels */
    QemuSemaphore sem_sync;
    /* global number of generated multifd packets */
    uint64_t packet_num;
} *multifd_recv_state;

static void multifd_recv_terminate_threads(Error *err)
@@ -1029,6 +1076,7 @@ int multifd_load_cleanup(Error **errp)
        p->c = NULL;
        qemu_mutex_destroy(&p->mutex);
        qemu_sem_destroy(&p->sem);
        qemu_sem_destroy(&p->sem_sync);
        g_free(p->name);
        p->name = NULL;
        multifd_pages_clear(p->pages);
@@ -1037,6 +1085,7 @@ int multifd_load_cleanup(Error **errp)
        g_free(p->packet);
        p->packet = NULL;
    }
    qemu_sem_destroy(&multifd_recv_state->sem_sync);
    g_free(multifd_recv_state->params);
    multifd_recv_state->params = NULL;
    g_free(multifd_recv_state);
@@ -1045,6 +1094,42 @@ int multifd_load_cleanup(Error **errp)
    return ret;
}

static void multifd_recv_sync_main(void)
{
    int i;

    if (!migrate_use_multifd()) {
        return;
    }
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];

        trace_multifd_recv_sync_main_signal(p->id);
        qemu_mutex_lock(&p->mutex);
        p->pending_job = true;
        qemu_mutex_unlock(&p->mutex);
    }
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];

        trace_multifd_recv_sync_main_wait(p->id);
        qemu_sem_wait(&multifd_recv_state->sem_sync);
        qemu_mutex_lock(&p->mutex);
        if (multifd_recv_state->packet_num < p->packet_num) {
            multifd_recv_state->packet_num = p->packet_num;
        }
        qemu_mutex_unlock(&p->mutex);
    }
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];

        trace_multifd_recv_sync_main_signal(p->id);

        qemu_sem_post(&p->sem_sync);
    }
    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
}

static void *multifd_recv_thread(void *opaque)
{
    MultiFDRecvParams *p = opaque;
@@ -1054,12 +1139,8 @@ static void *multifd_recv_thread(void *opaque)
    trace_multifd_recv_thread_start(p->id);

    while (true) {
        qemu_sem_wait(&p->sem);
        qemu_mutex_lock(&p->mutex);
        if (p->pending_job) {
        uint32_t used;
        uint32_t flags;
            qemu_mutex_unlock(&p->mutex);

        /* ToDo: recv packet here */

@@ -1077,14 +1158,11 @@ static void *multifd_recv_thread(void *opaque)
        p->num_packets++;
        p->num_pages += used;
        qemu_mutex_unlock(&p->mutex);
        } else if (p->quit) {
            qemu_mutex_unlock(&p->mutex);
            break;

        if (flags & MULTIFD_FLAG_SYNC) {
            qemu_sem_post(&multifd_recv_state->sem_sync);
            qemu_sem_wait(&p->sem_sync);
        }
        qemu_mutex_unlock(&p->mutex);
        /* this is impossible */
        error_setg(&local_err, "multifd_recv_thread: Unknown command");
        break;
    }

    if (local_err) {
@@ -1112,12 +1190,14 @@ int multifd_load_setup(void)
    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
    atomic_set(&multifd_recv_state->count, 0);
    qemu_sem_init(&multifd_recv_state->sem_sync, 0);

    for (i = 0; i < thread_count; i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];

        qemu_mutex_init(&p->mutex);
        qemu_sem_init(&p->sem, 0);
        qemu_sem_init(&p->sem_sync, 0);
        p->quit = false;
        p->pending_job = false;
        p->id = i;
@@ -2875,6 +2955,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
    ram_control_before_iterate(f, RAM_CONTROL_SETUP);
    ram_control_after_iterate(f, RAM_CONTROL_SETUP);

    multifd_send_sync_main();
    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);

    return 0;
@@ -2955,6 +3036,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
     */
    ram_control_after_iterate(f, RAM_CONTROL_ROUND);

    multifd_send_sync_main();
out:
    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
    ram_counters.transferred += 8;
@@ -3008,6 +3090,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)

    rcu_read_unlock();

    multifd_send_sync_main();
    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);

    return 0;
@@ -3497,6 +3580,7 @@ static int ram_load_postcopy(QEMUFile *f)
            break;
        case RAM_SAVE_FLAG_EOS:
            /* normal exit */
            multifd_recv_sync_main();
            break;
        default:
            error_report("Unknown combination of migration flags: %#x"
@@ -3685,6 +3769,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
            break;
        case RAM_SAVE_FLAG_EOS:
            /* normal exit */
            multifd_recv_sync_main();
            break;
        default:
            if (flags & RAM_SAVE_FLAG_HOOK) {
+6 −0
Original line number Diff line number Diff line
@@ -77,9 +77,15 @@ migration_bitmap_sync_start(void) ""
migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
migration_throttle(void) ""
multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %" PRIu64 " pages %d flags 0x%x"
multifd_recv_sync_main(long packet_num) "packet num %ld"
multifd_recv_sync_main_signal(uint8_t id) "channel %d"
multifd_recv_sync_main_wait(uint8_t id) "channel %d"
multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64
multifd_recv_thread_start(uint8_t id) "%d"
multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x"
multifd_send_sync_main(long packet_num) "packet num %ld"
multifd_send_sync_main_signal(uint8_t id) "channel %d"
multifd_send_sync_main_wait(uint8_t id) "channel %d"
multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %"  PRIu64
multifd_send_thread_start(uint8_t id) "%d"
ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx"