thread-pool.c 9.54 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * QEMU block layer thread pool
 *
 * Copyright IBM, Corp. 2008
 * Copyright Red Hat, Inc. 2012
 *
 * Authors:
 *  Anthony Liguori   <aliguori@us.ibm.com>
 *  Paolo Bonzini     <pbonzini@redhat.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 * the COPYING file in the top-level directory.
 *
 * Contributions after 2012-01-13 are licensed under the terms of the
 * GNU GPL, version 2 or (at your option) any later version.
 */
#include "qemu-common.h"
18 19 20
#include "qemu/queue.h"
#include "qemu/thread.h"
#include "qemu/osdep.h"
21
#include "block/coroutine.h"
22
#include "trace.h"
23
#include "block/block_int.h"
24
#include "qemu/event_notifier.h"
25
#include "block/thread-pool.h"
26
#include "qemu/main-loop.h"
27

28
static void do_spawn_thread(ThreadPool *pool);
29 30 31 32 33 34 35 36 37 38 39 40

typedef struct ThreadPoolElement ThreadPoolElement;

enum ThreadState {
    THREAD_QUEUED,
    THREAD_ACTIVE,
    THREAD_DONE,
    THREAD_CANCELED,
};

struct ThreadPoolElement {
    BlockDriverAIOCB common;
41
    ThreadPool *pool;
42 43
    ThreadPoolFunc *func;
    void *arg;
44 45 46 47 48

    /* Moving state out of THREAD_QUEUED is protected by lock.  After
     * that, only the worker thread can write to it.  Reads and writes
     * of state and ret are ordered with memory barriers.
     */
49 50 51 52 53 54 55 56 57 58
    enum ThreadState state;
    int ret;

    /* Access to this list is protected by lock.  */
    QTAILQ_ENTRY(ThreadPoolElement) reqs;

    /* Access to this list is protected by the global mutex.  */
    QLIST_ENTRY(ThreadPoolElement) all;
};

59 60
struct ThreadPool {
    EventNotifier notifier;
61
    AioContext *ctx;
62 63
    QemuMutex lock;
    QemuCond check_cancel;
64
    QemuCond worker_stopped;
65 66 67 68 69 70 71 72 73 74 75 76 77 78
    QemuSemaphore sem;
    int max_threads;
    QEMUBH *new_thread_bh;

    /* The following variables are only accessed from one AioContext. */
    QLIST_HEAD(, ThreadPoolElement) head;

    /* The following variables are protected by lock.  */
    QTAILQ_HEAD(, ThreadPoolElement) request_list;
    int cur_threads;
    int idle_threads;
    int new_threads;     /* backlog of threads we need to create */
    int pending_threads; /* threads created but not running yet */
    int pending_cancellations; /* whether we need a cond_broadcast */
79
    bool stopping;
80 81 82
};

static void *worker_thread(void *opaque)
83
{
84 85 86 87 88
    ThreadPool *pool = opaque;

    qemu_mutex_lock(&pool->lock);
    pool->pending_threads--;
    do_spawn_thread(pool);
89

90
    while (!pool->stopping) {
91 92 93 94
        ThreadPoolElement *req;
        int ret;

        do {
95 96 97 98 99 100
            pool->idle_threads++;
            qemu_mutex_unlock(&pool->lock);
            ret = qemu_sem_timedwait(&pool->sem, 10000);
            qemu_mutex_lock(&pool->lock);
            pool->idle_threads--;
        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
101
        if (ret == -1 || pool->stopping) {
102 103 104
            break;
        }

105 106
        req = QTAILQ_FIRST(&pool->request_list);
        QTAILQ_REMOVE(&pool->request_list, req, reqs);
107
        req->state = THREAD_ACTIVE;
108
        qemu_mutex_unlock(&pool->lock);
109 110 111 112

        ret = req->func(req->arg);

        req->ret = ret;
113 114 115 116
        /* Write ret before state.  */
        smp_wmb();
        req->state = THREAD_DONE;

117 118 119
        qemu_mutex_lock(&pool->lock);
        if (pool->pending_cancellations) {
            qemu_cond_broadcast(&pool->check_cancel);
120 121
        }

122
        event_notifier_set(&pool->notifier);
123 124
    }

125
    pool->cur_threads--;
126
    qemu_cond_signal(&pool->worker_stopped);
127
    qemu_mutex_unlock(&pool->lock);
128 129 130
    return NULL;
}

131
static void do_spawn_thread(ThreadPool *pool)
132 133 134 135
{
    QemuThread t;

    /* Runs with lock taken.  */
136
    if (!pool->new_threads) {
137 138 139
        return;
    }

140 141
    pool->new_threads--;
    pool->pending_threads++;
142

143
    qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
144 145 146 147
}

static void spawn_thread_bh_fn(void *opaque)
{
148 149 150 151 152
    ThreadPool *pool = opaque;

    qemu_mutex_lock(&pool->lock);
    do_spawn_thread(pool);
    qemu_mutex_unlock(&pool->lock);
153 154
}

155
static void spawn_thread(ThreadPool *pool)
156
{
157 158
    pool->cur_threads++;
    pool->new_threads++;
159 160 161 162 163 164 165
    /* If there are threads being created, they will spawn new workers, so
     * we don't spend time creating many threads in a loop holding a mutex or
     * starving the current vcpu.
     *
     * If there are no idle threads, ask the main thread to create one, so we
     * inherit the correct affinity instead of the vcpu affinity.
     */
166 167
    if (!pool->pending_threads) {
        qemu_bh_schedule(pool->new_thread_bh);
168 169 170 171 172
    }
}

static void event_notifier_ready(EventNotifier *notifier)
{
173
    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
174 175 176 177
    ThreadPoolElement *elem, *next;

    event_notifier_test_and_clear(notifier);
restart:
178
    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
179 180 181 182
        if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
            continue;
        }
        if (elem->state == THREAD_DONE) {
183 184
            trace_thread_pool_complete(pool, elem, elem->common.opaque,
                                       elem->ret);
185 186 187
        }
        if (elem->state == THREAD_DONE && elem->common.cb) {
            QLIST_REMOVE(elem, all);
188 189 190
            /* Read state before ret.  */
            smp_rmb();
            elem->common.cb(elem->common.opaque, elem->ret);
191 192 193 194 195 196 197 198 199 200 201 202 203
            qemu_aio_release(elem);
            goto restart;
        } else {
            /* remove the request */
            QLIST_REMOVE(elem, all);
            qemu_aio_release(elem);
        }
    }
}

static void thread_pool_cancel(BlockDriverAIOCB *acb)
{
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
204
    ThreadPool *pool = elem->pool;
205 206 207

    trace_thread_pool_cancel(elem, elem->common.opaque);

208
    qemu_mutex_lock(&pool->lock);
209 210 211 212 213 214
    if (elem->state == THREAD_QUEUED &&
        /* No thread has yet started working on elem. we can try to "steal"
         * the item from the worker if we can get a signal from the
         * semaphore.  Because this is non-blocking, we can do it with
         * the lock taken and ensure that elem will remain THREAD_QUEUED.
         */
215 216
        qemu_sem_timedwait(&pool->sem, 0) == 0) {
        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
217
        elem->state = THREAD_CANCELED;
218
        event_notifier_set(&pool->notifier);
219
    } else {
220
        pool->pending_cancellations++;
221
        while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
222
            qemu_cond_wait(&pool->check_cancel, &pool->lock);
223
        }
224
        pool->pending_cancellations--;
225
    }
226
    qemu_mutex_unlock(&pool->lock);
227 228
}

229
static const AIOCBInfo thread_pool_aiocb_info = {
230 231 232 233
    .aiocb_size         = sizeof(ThreadPoolElement),
    .cancel             = thread_pool_cancel,
};

234 235
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
        ThreadPoolFunc *func, void *arg,
236 237 238 239
        BlockDriverCompletionFunc *cb, void *opaque)
{
    ThreadPoolElement *req;

240
    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
241 242 243
    req->func = func;
    req->arg = arg;
    req->state = THREAD_QUEUED;
244
    req->pool = pool;
245

246
    QLIST_INSERT_HEAD(&pool->head, req, all);
247

248
    trace_thread_pool_submit(pool, req, arg);
249

250 251 252
    qemu_mutex_lock(&pool->lock);
    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
        spawn_thread(pool);
253
    }
254 255 256
    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
    qemu_mutex_unlock(&pool->lock);
    qemu_sem_post(&pool->sem);
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    return &req->common;
}

typedef struct ThreadPoolCo {
    Coroutine *co;
    int ret;
} ThreadPoolCo;

static void thread_pool_co_cb(void *opaque, int ret)
{
    ThreadPoolCo *co = opaque;

    co->ret = ret;
    qemu_coroutine_enter(co->co, NULL);
}

273 274
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
                                       void *arg)
275 276 277
{
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
    assert(qemu_in_coroutine());
278
    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
279 280 281 282
    qemu_coroutine_yield();
    return tpc.ret;
}

283
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
284
{
285
    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
286 287
}

288 289 290 291 292 293 294 295
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{
    if (!ctx) {
        ctx = qemu_get_aio_context();
    }

    memset(pool, 0, sizeof(*pool));
    event_notifier_init(&pool->notifier, false);
296
    pool->ctx = ctx;
297 298
    qemu_mutex_init(&pool->lock);
    qemu_cond_init(&pool->check_cancel);
299
    qemu_cond_init(&pool->worker_stopped);
300 301 302 303 304 305 306
    qemu_sem_init(&pool->sem, 0);
    pool->max_threads = 64;
    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);

    QLIST_INIT(&pool->head);
    QTAILQ_INIT(&pool->request_list);

307
    aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready);
308 309
}

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
ThreadPool *thread_pool_new(AioContext *ctx)
{
    ThreadPool *pool = g_new(ThreadPool, 1);
    thread_pool_init_one(pool, ctx);
    return pool;
}

void thread_pool_free(ThreadPool *pool)
{
    if (!pool) {
        return;
    }

    assert(QLIST_EMPTY(&pool->head));

    qemu_mutex_lock(&pool->lock);

    /* Stop new threads from spawning */
    qemu_bh_delete(pool->new_thread_bh);
    pool->cur_threads -= pool->new_threads;
    pool->new_threads = 0;

    /* Wait for worker threads to terminate */
    pool->stopping = true;
    while (pool->cur_threads > 0) {
        qemu_sem_post(&pool->sem);
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
    }

    qemu_mutex_unlock(&pool->lock);

341
    aio_set_event_notifier(pool->ctx, &pool->notifier, NULL);
342 343 344 345 346 347 348
    qemu_sem_destroy(&pool->sem);
    qemu_cond_destroy(&pool->check_cancel);
    qemu_cond_destroy(&pool->worker_stopped);
    qemu_mutex_destroy(&pool->lock);
    event_notifier_cleanup(&pool->notifier);
    g_free(pool);
}