Commit c4d9d196 authored by Stefan Hajnoczi's avatar Stefan Hajnoczi
Browse files

threadpool: drop global thread pool



Now that each AioContext has a ThreadPool and the main loop AioContext
can be fetched with bdrv_get_aio_context(), we can eliminate the concept
of a global thread pool from thread-pool.c.

The submit functions must take a ThreadPool* argument.

block/raw-posix.c and block/raw-win32.c use
aio_get_thread_pool(bdrv_get_aio_context(bs)) to fetch the main loop's
ThreadPool.

tests/test-thread-pool.c must be updated to reflect the new
thread_pool_submit() function prototypes.

Signed-off-by: default avatarStefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: default avatarPaolo Bonzini <pbonzini@redhat.com>
parent 85d126f3
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
        BlockDriverCompletionFunc *cb, void *opaque, int type)
{
    RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
    ThreadPool *pool;

    acb->bs = bs;
    acb->aio_type = type;
@@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
    acb->aio_offset = sector_num * 512;

    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
}

static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
@@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
{
    BDRVRawState *s = bs->opaque;
    RawPosixAIOData *acb;
    ThreadPool *pool;

    if (fd_open(bs) < 0)
        return NULL;
@@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
    acb->aio_offset = 0;
    acb->aio_ioctl_buf = buf;
    acb->aio_ioctl_cmd = req;
    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
}

#elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
+3 −1
Original line number Diff line number Diff line
@@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
        BlockDriverCompletionFunc *cb, void *opaque, int type)
{
    RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
    ThreadPool *pool;

    acb->bs = bs;
    acb->hfile = hfile;
@@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
    acb->aio_offset = sector_num * 512;

    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
}

int qemu_ftruncate64(int fd, int64_t length)
+6 −4
Original line number Diff line number Diff line
@@ -31,9 +31,11 @@ typedef struct ThreadPool ThreadPool;
ThreadPool *thread_pool_new(struct AioContext *ctx);
void thread_pool_free(ThreadPool *pool);

BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
        ThreadPoolFunc *func, void *arg,
        BlockDriverCompletionFunc *cb, void *opaque);
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
void thread_pool_submit(ThreadPoolFunc *func, void *arg);
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
        ThreadPoolFunc *func, void *arg);
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);

#endif
+21 −23
Original line number Diff line number Diff line
@@ -4,6 +4,8 @@
#include "block/thread-pool.h"
#include "block/block.h"

static AioContext *ctx;
static ThreadPool *pool;
static int active;

typedef struct {
@@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret)
    active--;
}

/* A non-blocking poll of the main AIO context (we cannot use aio_poll
 * because we do not know the AioContext).
 */
static void qemu_aio_wait_nonblocking(void)
{
    qemu_notify_event();
    qemu_aio_wait();
}

/* Wait until all aio and bh activity has finished */
static void qemu_aio_wait_all(void)
{
    while (qemu_aio_wait()) {
    while (aio_poll(ctx, true)) {
        /* Do nothing */
    }
}
@@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void)
static void test_submit(void)
{
    WorkerTestData data = { .n = 0 };
    thread_pool_submit(worker_cb, &data);
    thread_pool_submit(pool, worker_cb, &data);
    qemu_aio_wait_all();
    g_assert_cmpint(data.n, ==, 1);
}
@@ -66,7 +59,8 @@ static void test_submit(void)
static void test_submit_aio(void)
{
    WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
    data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data);
    data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
                                        done_cb, &data);

    /* The callbacks are not called until after the first wait.  */
    active = 1;
@@ -84,7 +78,7 @@ static void co_test_cb(void *opaque)
    active = 1;
    data->n = 0;
    data->ret = -EINPROGRESS;
    thread_pool_submit_co(worker_cb, data);
    thread_pool_submit_co(pool, worker_cb, data);

    /* The test continues in test_submit_co, after qemu_coroutine_enter... */

@@ -126,12 +120,12 @@ static void test_submit_many(void)
    for (i = 0; i < 100; i++) {
        data[i].n = 0;
        data[i].ret = -EINPROGRESS;
        thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
    }

    active = 100;
    while (active > 0) {
        qemu_aio_wait();
        aio_poll(ctx, true);
    }
    for (i = 0; i < 100; i++) {
        g_assert_cmpint(data[i].n, ==, 1);
@@ -154,7 +148,7 @@ static void test_cancel(void)
    for (i = 0; i < 100; i++) {
        data[i].n = 0;
        data[i].ret = -EINPROGRESS;
        data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
        data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
                                               done_cb, &data[i]);
    }

@@ -162,7 +156,8 @@ static void test_cancel(void)
     * run, but do not waste too much time...
     */
    active = 100;
    qemu_aio_wait_nonblocking();
    aio_notify(ctx);
    aio_poll(ctx, false);

    /* Wait some time for the threads to start, with some sanity
     * testing on the behavior of the scheduler...
@@ -208,11 +203,10 @@ static void test_cancel(void)

int main(int argc, char **argv)
{
    /* These should be removed once each AioContext has its thread pool.
     * The test should create its own AioContext.
     */
    qemu_init_main_loop();
    bdrv_init();
    int ret;

    ctx = aio_context_new();
    pool = aio_get_thread_pool(ctx);

    g_test_init(&argc, &argv, NULL);
    g_test_add_func("/thread-pool/submit", test_submit);
@@ -220,5 +214,9 @@ int main(int argc, char **argv)
    g_test_add_func("/thread-pool/submit-co", test_submit_co);
    g_test_add_func("/thread-pool/submit-many", test_submit_many);
    g_test_add_func("/thread-pool/cancel", test_cancel);
    return g_test_run();

    ret = g_test_run();

    aio_context_unref(ctx);
    return ret;
}
+7 −16
Original line number Diff line number Diff line
@@ -78,9 +78,6 @@ struct ThreadPool {
    bool stopping;
};

/* Currently there is only one thread pool instance. */
static ThreadPool global_pool;

static void *worker_thread(void *opaque)
{
    ThreadPool *pool = opaque;
@@ -239,10 +236,10 @@ static const AIOCBInfo thread_pool_aiocb_info = {
    .cancel             = thread_pool_cancel,
};

BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
        ThreadPoolFunc *func, void *arg,
        BlockDriverCompletionFunc *cb, void *opaque)
{
    ThreadPool *pool = &global_pool;
    ThreadPoolElement *req;

    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
@@ -278,18 +275,19 @@ static void thread_pool_co_cb(void *opaque, int ret)
    qemu_coroutine_enter(co->co, NULL);
}

int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
                                       void *arg)
{
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
    assert(qemu_in_coroutine());
    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
    qemu_coroutine_yield();
    return tpc.ret;
}

void thread_pool_submit(ThreadPoolFunc *func, void *arg)
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
{
    thread_pool_submit_aio(func, arg, NULL, NULL);
    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
}

static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
@@ -354,10 +352,3 @@ void thread_pool_free(ThreadPool *pool)
    event_notifier_cleanup(&pool->notifier);
    g_free(pool);
}

static void thread_pool_init(void)
{
    thread_pool_init_one(&global_pool, NULL);
}

block_init(thread_pool_init)