2 * Copyright 2008-2010, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <afs/afs_assert.h>
17 #include <afs/afsutil.h>
19 #include <afs/afsint.h>
21 #define __AFS_THREAD_POOL_IMPL 1
22 #include "work_queue.h"
23 #include "thread_pool.h"
24 #include "thread_pool_impl.h"
27 * public interfaces for thread_pool.
31 * allocate a thread pool object.
33 * @param[inout] pool_out address in which to store pool object pointer
35 * @return operation status
37 * @retval ENOMEM out of memory
42 _afs_tp_alloc(struct afs_thread_pool ** pool_out)
45 struct afs_thread_pool * pool;
47 *pool_out = pool = malloc(sizeof(*pool));
58 * free a thread pool object.
60 * @param[in] pool thread pool object
62 * @return operation status
68 _afs_tp_free(struct afs_thread_pool * pool)
78 * allocate a thread worker object.
80 * @param[inout] worker_out address in which to store worker object pointer
82 * @return operation status
84 * @retval ENOMEM out of memory
89 _afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out)
92 struct afs_thread_pool_worker * worker;
94 *worker_out = worker = malloc(sizeof(*worker));
100 queue_NodeInit(&worker->worker_list);
107 * free a thread worker object.
109 * @param[in] worker thread worker object
111 * @return operation status
117 _afs_tp_worker_free(struct afs_thread_pool_worker * worker)
127 * low-level thread entry point.
129 * @param[in] rock opaque pointer to thread worker object
131 * @return opaque return pointer from pool entry function
136 _afs_tp_worker_run(void * rock)
138 struct afs_thread_pool_worker * worker = rock;
139 struct afs_thread_pool * pool = worker->pool;
141 /* register worker with pool */
142 MUTEX_ENTER(&pool->lock);
143 queue_Append(&pool->thread_list, worker);
145 MUTEX_EXIT(&pool->lock);
147 /* call high-level entry point */
148 worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock);
150 /* adjust pool live thread count */
151 MUTEX_ENTER(&pool->lock);
152 osi_Assert(pool->nthreads);
153 queue_Remove(worker);
155 if (!pool->nthreads) {
156 CV_BROADCAST(&pool->shutdown_cv);
157 pool->state = AFS_TP_STATE_STOPPED;
159 MUTEX_EXIT(&pool->lock);
161 _afs_tp_worker_free(worker);
167 * default high-level thread entry point.
172 _afs_tp_worker_default(struct afs_thread_pool *pool,
173 struct afs_thread_pool_worker *worker,
174 struct afs_work_queue *queue,
178 while (code == 0 && afs_tp_worker_continue(worker)) {
179 code = afs_wq_do(queue, NULL /* no call rock */);
186 * start a worker thread.
188 * @param[in] pool thread pool object
189 * @param[inout] worker_out address in which to store worker thread object pointer
191 * @return operation status
193 * @retval ENOMEM out of memory
196 _afs_tp_worker_start(struct afs_thread_pool * pool,
197 struct afs_thread_pool_worker ** worker_out)
200 pthread_attr_t attrs;
201 struct afs_thread_pool_worker * worker;
203 ret = _afs_tp_worker_alloc(worker_out);
207 worker = *worker_out;
210 worker->req_shutdown = 0;
212 osi_Assert(pthread_attr_init(&attrs) == 0);
213 osi_Assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
215 ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker);
222 * create a thread pool.
224 * @param[inout] pool_out address in which to store pool object pointer.
225 * @param[in] queue work queue serviced by thread pool
227 * @return operation status
229 * @retval ENOMEM out of memory
232 afs_tp_create(struct afs_thread_pool ** pool_out,
233 struct afs_work_queue * queue)
236 struct afs_thread_pool * pool;
238 ret = _afs_tp_alloc(pool_out);
244 MUTEX_INIT(&pool->lock, "pool", MUTEX_DEFAULT, 0);
245 CV_INIT(&pool->shutdown_cv, "pool shutdown", CV_DEFAULT, 0);
246 queue_Init(&pool->thread_list);
247 pool->work_queue = queue;
248 pool->entry = &_afs_tp_worker_default;
251 pool->max_threads = 4;
252 pool->state = AFS_TP_STATE_INIT;
259 * destroy a thread pool.
261 * @param[in] pool thread pool object to be destroyed
263 * @return operation status
265 * @retval AFS_TP_ERROR pool not in a quiescent state
268 afs_tp_destroy(struct afs_thread_pool * pool)
272 MUTEX_ENTER(&pool->lock);
273 switch (pool->state) {
274 case AFS_TP_STATE_INIT:
275 case AFS_TP_STATE_STOPPED:
281 MUTEX_EXIT(&pool->lock);
288 * set the number of threads to spawn.
290 * @param[in] pool thread pool object
291 * @param[in] threads number of threads to spawn
293 * @return operation status
295 * @retval AFS_TP_ERROR thread pool has already been started
298 afs_tp_set_threads(struct afs_thread_pool *pool,
303 MUTEX_ENTER(&pool->lock);
304 if (pool->state != AFS_TP_STATE_INIT) {
307 pool->max_threads = threads;
309 MUTEX_EXIT(&pool->lock);
315 * set a custom thread entry point.
317 * @param[in] pool thread pool object
318 * @param[in] entry thread entry function pointer
319 * @param[in] rock opaque pointer passed to thread
321 * @return operation status
323 * @retval AFS_TP_ERROR thread pool has already been started
326 afs_tp_set_entry(struct afs_thread_pool * pool,
327 afs_tp_worker_func_t * entry,
332 MUTEX_ENTER(&pool->lock);
333 if (pool->state != AFS_TP_STATE_INIT) {
339 MUTEX_EXIT(&pool->lock);
345 * start a thread pool.
347 * @param[in] pool thread pool object
349 * @return operation status
351 * @retval AFS_TP_ERROR thread create failure
354 afs_tp_start(struct afs_thread_pool * pool)
357 struct afs_thread_pool_worker * worker;
360 MUTEX_ENTER(&pool->lock);
361 if (pool->state != AFS_TP_STATE_INIT) {
365 pool->state = AFS_TP_STATE_STARTING;
366 MUTEX_EXIT(&pool->lock);
368 for (i = 0; i < pool->max_threads; i++) {
369 code = _afs_tp_worker_start(pool, &worker);
375 MUTEX_ENTER(&pool->lock);
376 pool->state = AFS_TP_STATE_RUNNING;
378 MUTEX_EXIT(&pool->lock);
384 * shut down all threads in pool.
386 * @param[in] pool thread pool object
387 * @param[in] block wait for all threads to terminate, if asserted
389 * @return operation status
393 afs_tp_shutdown(struct afs_thread_pool * pool,
397 struct afs_thread_pool_worker * worker, *nn;
399 MUTEX_ENTER(&pool->lock);
400 if (pool->state == AFS_TP_STATE_STOPPED
401 || pool->state == AFS_TP_STATE_STOPPING) {
404 if (pool->state != AFS_TP_STATE_RUNNING) {
408 pool->state = AFS_TP_STATE_STOPPING;
410 for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) {
411 worker->req_shutdown = 1;
413 if (!pool->nthreads) {
414 pool->state = AFS_TP_STATE_STOPPED;
416 /* need to drop lock to get a membar here */
417 MUTEX_EXIT(&pool->lock);
419 ret = afs_wq_shutdown(pool->work_queue);
424 MUTEX_ENTER(&pool->lock);
427 while (pool->nthreads) {
428 CV_WAIT(&pool->shutdown_cv, &pool->lock);
432 MUTEX_EXIT(&pool->lock);
439 * check whether thread pool is online.
441 * @param[in] pool thread pool object
443 * @return whether pool is online
444 * @retval 1 pool is online
445 * @retval 0 pool is not online
448 afs_tp_is_online(struct afs_thread_pool * pool)
452 MUTEX_ENTER(&pool->lock);
453 ret = (pool->state == AFS_TP_STATE_RUNNING);
454 MUTEX_EXIT(&pool->lock);
460 * check whether a given worker thread can continue to run.
462 * @param[in] worker worker thread object pointer
464 * @return whether thread can continue to execute
465 * @retval 1 execution can continue
466 * @retval 0 shutdown has been requested
469 afs_tp_worker_continue(struct afs_thread_pool_worker * worker)
471 return !worker->req_shutdown;