2 * Copyright 2008-2010, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
20 #include <afs/afs_assert.h>
23 #include <sys/param.h>
25 #if defined(AFS_SUN5_ENV) || defined(AFS_HPUX_ENV)
28 #include <afs/afsutil.h>
30 #include <afs/afsint.h>
32 #define __AFS_THREAD_POOL_IMPL 1
33 #include "work_queue.h"
34 #include "thread_pool.h"
35 #include "thread_pool_impl.h"
38 * public interfaces for thread_pool.
42 * allocate a thread pool object.
44 * @param[inout] pool_out address in which to store pool object pointer
46 * @return operation status
48 * @retval ENOMEM out of memory
53 _afs_tp_alloc(struct afs_thread_pool ** pool_out)
56 struct afs_thread_pool * pool;
58 *pool_out = pool = malloc(sizeof(*pool));
69 * free a thread pool object.
71 * @param[in] pool thread pool object
73 * @return operation status
79 _afs_tp_free(struct afs_thread_pool * pool)
89 * allocate a thread worker object.
91 * @param[inout] worker_out address in which to store worker object pointer
93 * @return operation status
95 * @retval ENOMEM out of memory
100 _afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out)
103 struct afs_thread_pool_worker * worker;
105 *worker_out = worker = malloc(sizeof(*worker));
106 if (worker == NULL) {
111 queue_NodeInit(&worker->worker_list);
118 * free a thread worker object.
120 * @param[in] worker thread worker object
122 * @return operation status
128 _afs_tp_worker_free(struct afs_thread_pool_worker * worker)
138 * low-level thread entry point.
140 * @param[in] rock opaque pointer to thread worker object
142 * @return opaque return pointer from pool entry function
147 _afs_tp_worker_run(void * rock)
149 struct afs_thread_pool_worker * worker = rock;
150 struct afs_thread_pool * pool = worker->pool;
152 /* register worker with pool */
153 MUTEX_ENTER(&pool->lock);
154 queue_Append(&pool->thread_list, worker);
156 MUTEX_EXIT(&pool->lock);
158 /* call high-level entry point */
159 worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock);
161 /* adjust pool live thread count */
162 MUTEX_ENTER(&pool->lock);
163 osi_Assert(pool->nthreads);
164 queue_Remove(worker);
166 if (!pool->nthreads) {
167 CV_BROADCAST(&pool->shutdown_cv);
168 pool->state = AFS_TP_STATE_STOPPED;
170 MUTEX_EXIT(&pool->lock);
172 _afs_tp_worker_free(worker);
178 * default high-level thread entry point.
183 _afs_tp_worker_default(struct afs_thread_pool *pool,
184 struct afs_thread_pool_worker *worker,
185 struct afs_work_queue *queue,
189 while (code == 0 && afs_tp_worker_continue(worker)) {
190 code = afs_wq_do(queue, NULL /* no call rock */);
197 * start a worker thread.
199 * @param[in] pool thread pool object
200 * @param[inout] worker_out address in which to store worker thread object pointer
202 * @return operation status
204 * @retval ENOMEM out of memory
207 _afs_tp_worker_start(struct afs_thread_pool * pool,
208 struct afs_thread_pool_worker ** worker_out)
211 pthread_attr_t attrs;
212 struct afs_thread_pool_worker * worker;
214 ret = _afs_tp_worker_alloc(worker_out);
218 worker = *worker_out;
221 worker->req_shutdown = 0;
223 osi_Assert(pthread_attr_init(&attrs) == 0);
224 osi_Assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
226 ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker);
233 * create a thread pool.
235 * @param[inout] pool_out address in which to store pool object pointer.
236 * @param[in] queue work queue serviced by thread pool
238 * @return operation status
240 * @retval ENOMEM out of memory
243 afs_tp_create(struct afs_thread_pool ** pool_out,
244 struct afs_work_queue * queue)
247 struct afs_thread_pool * pool;
249 ret = _afs_tp_alloc(pool_out);
255 MUTEX_INIT(&pool->lock, "pool", MUTEX_DEFAULT, 0);
256 CV_INIT(&pool->shutdown_cv, "pool shutdown", CV_DEFAULT, 0);
257 queue_Init(&pool->thread_list);
258 pool->work_queue = queue;
259 pool->entry = &_afs_tp_worker_default;
262 pool->max_threads = 4;
263 pool->state = AFS_TP_STATE_INIT;
270 * destroy a thread pool.
272 * @param[in] pool thread pool object to be destroyed
274 * @return operation status
276 * @retval AFS_TP_ERROR pool not in a quiescent state
279 afs_tp_destroy(struct afs_thread_pool * pool)
283 MUTEX_ENTER(&pool->lock);
284 switch (pool->state) {
285 case AFS_TP_STATE_INIT:
286 case AFS_TP_STATE_STOPPED:
292 MUTEX_EXIT(&pool->lock);
299 * set the number of threads to spawn.
301 * @param[in] pool thread pool object
302 * @param[in] threads number of threads to spawn
304 * @return operation status
306 * @retval AFS_TP_ERROR thread pool has already been started
309 afs_tp_set_threads(struct afs_thread_pool *pool,
314 MUTEX_ENTER(&pool->lock);
315 if (pool->state != AFS_TP_STATE_INIT) {
318 pool->max_threads = threads;
320 MUTEX_EXIT(&pool->lock);
326 * set a custom thread entry point.
328 * @param[in] pool thread pool object
329 * @param[in] entry thread entry function pointer
330 * @param[in] rock opaque pointer passed to thread
332 * @return operation status
334 * @retval AFS_TP_ERROR thread pool has already been started
337 afs_tp_set_entry(struct afs_thread_pool * pool,
338 afs_tp_worker_func_t * entry,
343 MUTEX_ENTER(&pool->lock);
344 if (pool->state != AFS_TP_STATE_INIT) {
350 MUTEX_EXIT(&pool->lock);
356 * start a thread pool.
358 * @param[in] pool thread pool object
360 * @return operation status
362 * @retval AFS_TP_ERROR thread create failure
365 afs_tp_start(struct afs_thread_pool * pool)
368 struct afs_thread_pool_worker * worker;
371 MUTEX_ENTER(&pool->lock);
372 if (pool->state != AFS_TP_STATE_INIT) {
376 pool->state = AFS_TP_STATE_STARTING;
377 MUTEX_EXIT(&pool->lock);
379 for (i = 0; i < pool->max_threads; i++) {
380 code = _afs_tp_worker_start(pool, &worker);
386 MUTEX_ENTER(&pool->lock);
387 pool->state = AFS_TP_STATE_RUNNING;
389 MUTEX_EXIT(&pool->lock);
395 * shut down all threads in pool.
397 * @param[in] pool thread pool object
398 * @param[in] block wait for all threads to terminate, if asserted
400 * @return operation status
404 afs_tp_shutdown(struct afs_thread_pool * pool,
408 struct afs_thread_pool_worker * worker, *nn;
410 MUTEX_ENTER(&pool->lock);
411 if (pool->state == AFS_TP_STATE_STOPPED
412 || pool->state == AFS_TP_STATE_STOPPING) {
415 if (pool->state != AFS_TP_STATE_RUNNING) {
419 pool->state = AFS_TP_STATE_STOPPING;
421 for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) {
422 worker->req_shutdown = 1;
424 if (!pool->nthreads) {
425 pool->state = AFS_TP_STATE_STOPPED;
427 /* need to drop lock to get a membar here */
428 MUTEX_EXIT(&pool->lock);
430 ret = afs_wq_shutdown(pool->work_queue);
435 MUTEX_ENTER(&pool->lock);
438 while (pool->nthreads) {
439 CV_WAIT(&pool->shutdown_cv, &pool->lock);
443 MUTEX_EXIT(&pool->lock);
450 * check whether thread pool is online.
452 * @param[in] pool thread pool object
454 * @return whether pool is online
455 * @retval 1 pool is online
456 * @retval 0 pool is not online
459 afs_tp_is_online(struct afs_thread_pool * pool)
463 MUTEX_ENTER(&pool->lock);
464 ret = (pool->state == AFS_TP_STATE_RUNNING);
465 MUTEX_EXIT(&pool->lock);
471 * check whether a given worker thread can continue to run.
473 * @param[in] worker worker thread object pointer
475 * @return whether thread can continue to execute
476 * @retval 1 execution can continue
477 * @retval 0 shutdown has been requested
480 afs_tp_worker_continue(struct afs_thread_pool_worker * worker)
482 return !worker->req_shutdown;