Commit b341574e authored by Zheng Chuan's avatar Zheng Chuan Committed by zhuyanting
Browse files

migration/multifd: fix potential wrong acception order of IOChannel



Multifd assumes the migration thread IOChannel is always established before
the multifd IOChannels, but this assumption will be broken in many situations
like network packet loss.

For example:
Step1: Source (migration thread IOChannel)  --SYN-->  Destination
Step2: Source (migration thread IOChannel)  <--SYNACK  Destination
Step3: Source (migration thread IOChannel, lost) --ACK-->X  Destination
Step4: Source (multifd IOChannel) --SYN-->    Destination
Step5: Source (multifd IOChannel) <--SYNACK   Destination
Step6: Source (multifd IOChannel, ESTABLISHED) --ACK-->  Destination
Step7: Destination accepts multifd IOChannel
Step8: Source (migration thread IOChannel, ESTABLISHED) -ACK,DATA->  Destination
Step9: Destination accepts migration thread IOChannel

The above situation can be reproduced by creating a weak network environment,
such as "tc qdisc add dev eth0 root netem loss 50%". The wrong acception order
will cause magic check failure and thus lead to migration failure.

This patch fixes this issue by sending a migration IOChannel initial packet with
a unique id when using multifd migration. Since the multifd IOChannels will also
send initial packets, the destination can judge whether the processing IOChannel
belongs to multifd by checking the id in the initial packet. This mechanism can
ensure that different IOChannels will go to correct branches in our test.

Change-Id: I63d1c32c7b66063bd6a3c5e7d63500555bd148b9
Signed-off-by: default avatarJiahui Cen <cenjiahui@huawei.com>
Signed-off-by: default avatarYing Fang <fangying1@huawei.com>
parent 718c4dd4
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -82,6 +82,15 @@ void migration_channel_connect(MigrationState *s,
                return;
            }
        } else {
            if (migrate_use_multifd()) {
                /* multifd migration cannot distinguish migration IOChannel
                 * from multifd IOChannels, so we need to send an initial packet
                 * to show it is migration IOChannel
                 */
                migration_send_initial_packet(ioc,
                                              migrate_multifd_channels(),
                                              &error);
            }
            QEMUFile *f = qemu_fopen_channel_output(ioc);

            qemu_mutex_lock(&s->qemu_file_lock);
+24 −25
Original line number Diff line number Diff line
@@ -517,12 +517,6 @@ static void migration_incoming_setup(QEMUFile *f)
{
    MigrationIncomingState *mis = migration_incoming_get_current();

    if (multifd_load_setup() != 0) {
        /* We haven't been able to create multifd threads
           nothing better to do */
        exit(EXIT_FAILURE);
    }

    if (!mis->from_src_file) {
        mis->from_src_file = f;
    }
@@ -580,10 +574,15 @@ void migration_fd_process_incoming(QEMUFile *f)
void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
{
    MigrationIncomingState *mis = migration_incoming_get_current();
    bool start_migration;
    Error *local_err = NULL;
    int id = 0;

    if (migrate_use_multifd()) {
        id = migration_recv_initial_packet(ioc, &local_err);
    }
    if (!migrate_use_multifd() || id == migrate_multifd_channels()) {
        if (!mis->from_src_file) {
        /* The first connection (multifd may have multiple) */
            /* The migration connection (multifd may have multiple) */
            QEMUFile *f = qemu_fopen_channel_input(ioc);

            /* If it's a recovery, we're done */
@@ -592,24 +591,24 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
            }

            migration_incoming_setup(f);

        /*
         * Common migration only needs one channel, so we can start
         * right now.  Multifd needs more than one channel, we wait.
         */
        start_migration = !migrate_use_multifd();
    } else {
        Error *local_err = NULL;
        }
    } else if (id >= 0) {
        /* Multiple connections */
        assert(migrate_use_multifd());
        start_migration = multifd_recv_new_channel(ioc, &local_err);
        multifd_recv_new_channel(ioc, id, &local_err);
        if (local_err) {
            error_propagate(errp, local_err);
            return;
        }
    } else {
        /* Bad connections */
        multifd_recv_terminate_threads(local_err);
        error_propagate(errp, local_err);
        return;
    }

    if (start_migration) {
    /* Once we have all the channels we need, we can start migration */
    if (migration_has_all_channels()) {
        migration_incoming_process();
    }
}
+3 −0
Original line number Diff line number Diff line
@@ -339,4 +339,7 @@ int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque);
void migration_make_urgent_request(void);
void migration_consume_urgent_request(void);

int migration_send_initial_packet(QIOChannel *c, uint8_t id, Error **errp);
int migration_recv_initial_packet(QIOChannel *c, Error **errp);

#endif
+13 −26
Original line number Diff line number Diff line
@@ -593,7 +593,7 @@ typedef struct {
    uint8_t id;
    uint8_t unused1[7];     /* Reserved for future use */
    uint64_t unused2[4];    /* Reserved for future use */
} __attribute__((packed)) MultiFDInit_t;
} __attribute__((packed)) MigrationInit_t;

typedef struct {
    uint32_t magic;
@@ -702,26 +702,26 @@ typedef struct {
    QemuSemaphore sem_sync;
} MultiFDRecvParams;

static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
int migration_send_initial_packet(QIOChannel *c, uint8_t id, Error **errp)
{
    MultiFDInit_t msg;
    MigrationInit_t msg;
    int ret;

    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
    msg.version = cpu_to_be32(MULTIFD_VERSION);
    msg.id = p->id;
    msg.id = id;
    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));

    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
    ret = qio_channel_write_all(c, (char *)&msg, sizeof(msg), errp);
    if (ret != 0) {
        return -1;
    }
    return 0;
}

static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
int migration_recv_initial_packet(QIOChannel *c, Error **errp)
{
    MultiFDInit_t msg;
    MigrationInit_t msg;
    int ret;

    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
@@ -756,8 +756,8 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
    }

    if (msg.id > migrate_multifd_channels()) {
        error_setg(errp, "multifd: received channel version %d "
                   "expected %d", msg.version, MULTIFD_VERSION);
        error_setg(errp, "multifd: received channel id %d "
                   "expected [0-%d]", msg.id, migrate_multifd_channels());
        return -1;
    }

@@ -1111,7 +1111,7 @@ static void *multifd_send_thread(void *opaque)
    trace_multifd_send_thread_start(p->id);
    rcu_register_thread();

    if (multifd_send_initial_packet(p, &local_err) < 0) {
    if (migration_send_initial_packet(p->c, p->id, &local_err) < 0) {
        ret = -1;
        goto out;
    }
@@ -1255,7 +1255,7 @@ struct {
    uint64_t packet_num;
} *multifd_recv_state;

static void multifd_recv_terminate_threads(Error *err)
void multifd_recv_terminate_threads(Error *err)
{
    int i;

@@ -1470,21 +1470,10 @@ bool multifd_recv_all_channels_created(void)
 * - Return false and do not set @errp when correctly receiving the current one;
 * - Return false and set @errp when failing to receive the current channel.
 */
bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
void multifd_recv_new_channel(QIOChannel *ioc, int id, Error **errp)
{
    MultiFDRecvParams *p;
    Error *local_err = NULL;
    int id;

    id = multifd_recv_initial_packet(ioc, &local_err);
    if (id < 0) {
        multifd_recv_terminate_threads(local_err);
        error_propagate_prepend(errp, local_err,
                                "failed to receive packet"
                                " via multifd channel %d: ",
                                atomic_read(&multifd_recv_state->count));
        return false;
    }

    p = &multifd_recv_state->params[id];
    if (p->c != NULL) {
@@ -1492,7 +1481,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
                   id);
        multifd_recv_terminate_threads(local_err);
        error_propagate(errp, local_err);
        return false;
        return;
    }
    p->c = ioc;
    object_ref(OBJECT(ioc));
@@ -1503,8 +1492,6 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                       QEMU_THREAD_JOINABLE);
    atomic_inc(&multifd_recv_state->count);
    return atomic_read(&multifd_recv_state->count) ==
           migrate_multifd_channels();
}

/**
+2 −1
Original line number Diff line number Diff line
@@ -46,7 +46,8 @@ void multifd_save_cleanup(void);
int multifd_load_setup(void);
int multifd_load_cleanup(Error **errp);
bool multifd_recv_all_channels_created(void);
bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_new_channel(QIOChannel *ioc, int id, Error **errp);
void multifd_recv_terminate_threads(Error *err);

uint64_t ram_pagesize_summary(void);
int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
Loading