Commit 66d2a242 authored by zhanghailiang's avatar zhanghailiang Committed by Jason Wang
Browse files

colo-compare: use g_timeout_source_new() to process the stale packets



Instead of using qemu timer to process the stale packets,
We re-use the colo compare thread to process these packets
by creating a new timeout coroutine.

Besides, since we process all the same vNIC's net connection/packets
in one thread, it is safe to remove the timer_check_lock.

Signed-off-by: default avatarzhanghailiang <zhang.zhanghailiang@huawei.com>
Signed-off-by: default avatarJason Wang <jasowang@redhat.com>
parent 002d394f
Loading
Loading
Loading
Loading
+22 −40
Original line number Diff line number Diff line
@@ -83,9 +83,6 @@ typedef struct CompareState {
    GHashTable *connection_track_table;
    /* compare thread, a thread for each NIC */
    QemuThread thread;
    /* Timer used on the primary to find packets that are never matched */
    QEMUTimer *timer;
    QemuMutex timer_check_lock;
} CompareState;

typedef struct CompareClass {
@@ -374,9 +371,7 @@ static void colo_compare_connection(void *opaque, void *user_data)

    while (!g_queue_is_empty(&conn->primary_list) &&
           !g_queue_is_empty(&conn->secondary_list)) {
        qemu_mutex_lock(&s->timer_check_lock);
        pkt = g_queue_pop_tail(&conn->primary_list);
        qemu_mutex_unlock(&s->timer_check_lock);
        switch (conn->ip_proto) {
        case IPPROTO_TCP:
            result = g_queue_find_custom(&conn->secondary_list,
@@ -411,9 +406,7 @@ static void colo_compare_connection(void *opaque, void *user_data)
             * until next comparison.
             */
            trace_colo_compare_main("packet different");
            qemu_mutex_lock(&s->timer_check_lock);
            g_queue_push_tail(&conn->primary_list, pkt);
            qemu_mutex_unlock(&s->timer_check_lock);
            /* TODO: colo_notify_checkpoint();*/
            break;
        }
@@ -486,11 +479,26 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
    }
}

/*
 * Check old packet regularly so it can watch for any packets
 * that the secondary hasn't produced equivalents of.
 */
static gboolean check_old_packet_regular(void *opaque)
{
    CompareState *s = opaque;

    /* if have old packet we will notify checkpoint */
    colo_old_packet_check(s);

    return TRUE;
}

static void *colo_compare_thread(void *opaque)
{
    GMainContext *worker_context;
    GMainLoop *compare_loop;
    CompareState *s = opaque;
    GSource *timeout_source;

    worker_context = g_main_context_new();

@@ -501,8 +509,15 @@ static void *colo_compare_thread(void *opaque)

    compare_loop = g_main_loop_new(worker_context, FALSE);

    /* To kick any packets that the secondary doesn't match */
    timeout_source = g_timeout_source_new(REGULAR_PACKET_CHECK_MS);
    g_source_set_callback(timeout_source,
                          (GSourceFunc)check_old_packet_regular, s, NULL);
    g_source_attach(timeout_source, worker_context);

    g_main_loop_run(compare_loop);

    g_source_unref(timeout_source);
    g_main_loop_unref(compare_loop);
    g_main_context_unref(worker_context);
    return NULL;
@@ -603,26 +618,6 @@ static int find_and_check_chardev(Chardev **chr,
    return 0;
}

/*
 * Check old packet regularly so it can watch for any packets
 * that the secondary hasn't produced equivalents of.
 */
static void check_old_packet_regular(void *opaque)
{
    CompareState *s = opaque;

    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
              REGULAR_PACKET_CHECK_MS);
    /* if have old packet we will notify checkpoint */
    /*
     * TODO: Make timer handler run in compare thread
     * like qemu_chr_add_handlers_full.
     */
    qemu_mutex_lock(&s->timer_check_lock);
    colo_old_packet_check(s);
    qemu_mutex_unlock(&s->timer_check_lock);
}

/*
 * Called from the main thread on the primary
 * to setup colo-compare.
@@ -665,7 +660,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);

    g_queue_init(&s->conn_list);
    qemu_mutex_init(&s->timer_check_lock);

    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
                                                      connection_key_equal,
@@ -678,12 +672,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
                       QEMU_THREAD_JOINABLE);
    compare_id++;

    /* A regular timer to kick any packets that the secondary doesn't match */
    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
                            check_old_packet_regular, s);
    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
                        REGULAR_PACKET_CHECK_MS);

    return;
}

@@ -723,12 +711,6 @@ static void colo_compare_finalize(Object *obj)
        qemu_thread_join(&s->thread);
    }

    if (s->timer) {
        timer_del(s->timer);
    }

    qemu_mutex_destroy(&s->timer_check_lock);

    g_free(s->pri_indev);
    g_free(s->sec_indev);
    g_free(s->outdev);