Commit 66770707 authored by Juan Quintela's avatar Juan Quintela
Browse files

migration: terminate_* can be called for other threads



Once there, make  count field to always be accessed with atomic
operations.  To make blocking operations, we need to know that the
thread is running, so create a bool to indicate that.

Signed-off-by: default avatarJuan Quintela <quintela@redhat.com>
Reviewed-by: default avatarDaniel P. Berrangé <berrange@redhat.com>

--

Once here, s/terminate_multifd_*-threads/multifd_*_terminate_threads/
This is consistente with every other function
parent 71bb07db
Loading
Loading
Loading
Loading
+30 −14
Original line number Diff line number Diff line
@@ -439,6 +439,7 @@ struct MultiFDSendParams {
    QemuThread thread;
    QemuSemaphore sem;
    QemuMutex mutex;
    bool running;
    bool quit;
};
typedef struct MultiFDSendParams MultiFDSendParams;
@@ -449,7 +450,7 @@ struct {
    int count;
} *multifd_send_state;

static void terminate_multifd_send_threads(Error *err)
static void multifd_send_terminate_threads(Error *err)
{
    int i;

@@ -465,7 +466,7 @@ static void terminate_multifd_send_threads(Error *err)
        }
    }

    for (i = 0; i < multifd_send_state->count; i++) {
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];

        qemu_mutex_lock(&p->mutex);
@@ -483,11 +484,13 @@ int multifd_save_cleanup(Error **errp)
    if (!migrate_use_multifd()) {
        return 0;
    }
    terminate_multifd_send_threads(NULL);
    for (i = 0; i < multifd_send_state->count; i++) {
    multifd_send_terminate_threads(NULL);
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];

        if (p->running) {
            qemu_thread_join(&p->thread);
        }
        qemu_mutex_destroy(&p->mutex);
        qemu_sem_destroy(&p->sem);
        g_free(p->name);
@@ -514,6 +517,10 @@ static void *multifd_send_thread(void *opaque)
        qemu_sem_wait(&p->sem);
    }

    qemu_mutex_lock(&p->mutex);
    p->running = false;
    qemu_mutex_unlock(&p->mutex);

    return NULL;
}

@@ -528,7 +535,7 @@ int multifd_save_setup(void)
    thread_count = migrate_multifd_channels();
    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
    multifd_send_state->count = 0;
    atomic_set(&multifd_send_state->count, 0);
    for (i = 0; i < thread_count; i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];

@@ -537,10 +544,11 @@ int multifd_save_setup(void)
        p->quit = false;
        p->id = i;
        p->name = g_strdup_printf("multifdsend_%d", i);
        p->running = true;
        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
                           QEMU_THREAD_JOINABLE);

        multifd_send_state->count++;
        atomic_inc(&multifd_send_state->count);
    }
    return 0;
}
@@ -551,6 +559,7 @@ struct MultiFDRecvParams {
    QemuThread thread;
    QemuSemaphore sem;
    QemuMutex mutex;
    bool running;
    bool quit;
};
typedef struct MultiFDRecvParams MultiFDRecvParams;
@@ -561,7 +570,7 @@ struct {
    int count;
} *multifd_recv_state;

static void terminate_multifd_recv_threads(Error *err)
static void multifd_recv_terminate_threads(Error *err)
{
    int i;

@@ -575,7 +584,7 @@ static void terminate_multifd_recv_threads(Error *err)
        }
    }

    for (i = 0; i < multifd_recv_state->count; i++) {
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];

        qemu_mutex_lock(&p->mutex);
@@ -593,11 +602,13 @@ int multifd_load_cleanup(Error **errp)
    if (!migrate_use_multifd()) {
        return 0;
    }
    terminate_multifd_recv_threads(NULL);
    for (i = 0; i < multifd_recv_state->count; i++) {
    multifd_recv_terminate_threads(NULL);
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];

        if (p->running) {
            qemu_thread_join(&p->thread);
        }
        qemu_mutex_destroy(&p->mutex);
        qemu_sem_destroy(&p->sem);
        g_free(p->name);
@@ -625,6 +636,10 @@ static void *multifd_recv_thread(void *opaque)
        qemu_sem_wait(&p->sem);
    }

    qemu_mutex_lock(&p->mutex);
    p->running = false;
    qemu_mutex_unlock(&p->mutex);

    return NULL;
}

@@ -639,7 +654,7 @@ int multifd_load_setup(void)
    thread_count = migrate_multifd_channels();
    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
    multifd_recv_state->count = 0;
    atomic_set(&multifd_recv_state->count, 0);
    for (i = 0; i < thread_count; i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];

@@ -648,9 +663,10 @@ int multifd_load_setup(void)
        p->quit = false;
        p->id = i;
        p->name = g_strdup_printf("multifdrecv_%d", i);
        p->running = true;
        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                           QEMU_THREAD_JOINABLE);
        multifd_recv_state->count++;
        atomic_inc(&multifd_recv_state->count);
    }
    return 0;
}