Provide an abstract thread pool object
[openafs.git] / src / util / thread_pool.c
1 /*
2  * Copyright 2008-2010, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <errno.h>
17 #include <fcntl.h>
18 #include <sys/stat.h>
19 #include <dirent.h>
20 #include <afs/assert.h>
21 #include <string.h>
22 #include <sys/file.h>
23 #include <sys/param.h>
24 #include <lock.h>
25 #if defined(AFS_SUN5_ENV) || defined(AFS_HPUX_ENV)
26 #include <unistd.h>
27 #endif
28 #include <afs/afsutil.h>
29 #include <lwp.h>
30 #include <afs/afsint.h>
31
32 #define __AFS_THREAD_POOL_IMPL 1
33 #include "work_queue.h"
34 #include "thread_pool.h"
35 #include "thread_pool_impl.h"
36
37 /**
38  * public interfaces for thread_pool.
39  */
40
41 /**
42  * allocate a thread pool object.
43  *
44  * @param[inout] pool_out address in which to store pool object pointer
45  *
46  * @return operation status
47  *    @retval 0 success
48  *    @retval ENOMEM out of memory
49  *
50  * @internal
51  */
52 static int
53 _afs_tp_alloc(struct afs_thread_pool ** pool_out)
54 {
55     int ret = 0;
56     struct afs_thread_pool * pool;
57
58     *pool_out = pool = malloc(sizeof(*pool));
59     if (pool == NULL) {
60         ret = ENOMEM;
61         goto error;
62     }
63
64  error:
65     return ret;
66 }
67
68 /**
69  * free a thread pool object.
70  *
71  * @param[in] pool  thread pool object
72  *
73  * @return operation status
74  *    @retval 0 success
75  *
76  * @internal
77  */
78 static int
79 _afs_tp_free(struct afs_thread_pool * pool)
80 {
81     int ret = 0;
82
83     free(pool);
84
85     return ret;
86 }
87
88 /**
89  * allocate a thread worker object.
90  *
91  * @param[inout] worker_out address in which to store worker object pointer
92  *
93  * @return operation status
94  *    @retval 0 success
95  *    @retval ENOMEM out of memory
96  *
97  * @internal
98  */
99 static int
100 _afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out)
101 {
102     int ret = 0;
103     struct afs_thread_pool_worker * worker;
104
105     *worker_out = worker = malloc(sizeof(*worker));
106     if (worker == NULL) {
107         ret = ENOMEM;
108         goto error;
109     }
110
111     queue_NodeInit(&worker->worker_list);
112
113  error:
114     return ret;
115 }
116
117 /**
118  * free a thread worker object.
119  *
120  * @param[in] worker  thread worker object
121  *
122  * @return operation status
123  *    @retval 0 success
124  *
125  * @internal
126  */
127 static int
128 _afs_tp_worker_free(struct afs_thread_pool_worker * worker)
129 {
130     int ret = 0;
131
132     free(worker);
133
134     return ret;
135 }
136
137 /**
138  * low-level thread entry point.
139  *
140  * @param[in] rock opaque pointer to thread worker object
141  *
142  * @return opaque return pointer from pool entry function
143  *
144  * @internal
145  */
146 static void *
147 _afs_tp_worker_run(void * rock)
148 {
149     struct afs_thread_pool_worker * worker = rock;
150     struct afs_thread_pool * pool = worker->pool;
151
152     /* register worker with pool */
153     assert(pthread_mutex_lock(&pool->lock) == 0);
154     queue_Append(&pool->thread_list, worker);
155     pool->nthreads++;
156     assert(pthread_mutex_unlock(&pool->lock) == 0);
157
158     /* call high-level entry point */
159     worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock);
160
161     /* adjust pool live thread count */
162     assert(pthread_mutex_lock(&pool->lock) == 0);
163     assert(pool->nthreads);
164     queue_Remove(worker);
165     pool->nthreads--;
166     if (!pool->nthreads) {
167         assert(pthread_cond_broadcast(&pool->shutdown_cv) == 0);
168         pool->state = AFS_TP_STATE_STOPPED;
169     }
170     assert(pthread_mutex_unlock(&pool->lock) == 0);
171
172     _afs_tp_worker_free(worker);
173
174     return NULL;
175 }
176
177 /**
178  * default high-level thread entry point.
179  *
180  * @internal
181  */
182 static void *
183 _afs_tp_worker_default(struct afs_thread_pool *pool,
184                        struct afs_thread_pool_worker *worker,
185                        struct afs_work_queue *queue,
186                        void *rock)
187 {
188     int code = 0;
189     while (code == 0 && afs_tp_worker_continue(worker)) {
190         code = afs_wq_do(queue, NULL /* no call rock */);
191     }
192
193     return NULL;
194 }
195
196 /**
197  * start a worker thread.
198  *
199  * @param[in] pool         thread pool object
200  * @param[inout] worker_out  address in which to store worker thread object pointer
201  *
202  * @return operation status
203  *    @retval 0 success
204  *    @retval ENOMEM out of memory
205  */
206 static int
207 _afs_tp_worker_start(struct afs_thread_pool * pool,
208                      struct afs_thread_pool_worker ** worker_out)
209 {
210     int ret = 0;
211     pthread_attr_t attrs;
212     struct afs_thread_pool_worker * worker;
213
214     ret = _afs_tp_worker_alloc(worker_out);
215     if (ret) {
216         goto error;
217     }
218     worker = *worker_out;
219
220     worker->pool = pool;
221     worker->req_shutdown = 0;
222
223     assert(pthread_attr_init(&attrs) == 0);
224     assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
225
226     ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker);
227
228  error:
229     return ret;
230 }
231
232 /**
233  * create a thread pool.
234  *
235  * @param[inout] pool_out  address in which to store pool object pointer.
236  * @param[in]    queue     work queue serviced by thread pool
237  *
238  * @return operation status
239  *    @retval 0 success
240  *    @retval ENOMEM out of memory
241  */
242 int
243 afs_tp_create(struct afs_thread_pool ** pool_out,
244               struct afs_work_queue * queue)
245 {
246     int ret = 0;
247     struct afs_thread_pool * pool;
248
249     ret = _afs_tp_alloc(pool_out);
250     if (ret) {
251         goto error;
252     }
253     pool = *pool_out;
254
255     assert(pthread_mutex_init(&pool->lock, NULL) == 0);
256     assert(pthread_cond_init(&pool->shutdown_cv, NULL) == 0);
257     queue_Init(&pool->thread_list);
258     pool->work_queue = queue;
259     pool->entry = &_afs_tp_worker_default;
260     pool->rock = NULL;
261     pool->nthreads = 0;
262     pool->max_threads = 4;
263     pool->state = AFS_TP_STATE_INIT;
264
265  error:
266     return ret;
267 }
268
269 /**
270  * destroy a thread pool.
271  *
272  * @param[in] pool thread pool object to be destroyed
273  *
274  * @return operation status
275  *    @retval 0 success
276  *    @retval AFS_TP_ERROR pool not in a quiescent state
277  */
278 int
279 afs_tp_destroy(struct afs_thread_pool * pool)
280 {
281     int ret = 0;
282
283     assert(pthread_mutex_lock(&pool->lock) == 0);
284     switch (pool->state) {
285     case AFS_TP_STATE_INIT:
286     case AFS_TP_STATE_STOPPED:
287         _afs_tp_free(pool);
288         break;
289
290     default:
291         ret = AFS_TP_ERROR;
292         assert(pthread_mutex_unlock(&pool->lock) == 0);
293     }
294
295     return ret;
296 }
297
298 /**
299  * set the number of threads to spawn.
300  *
301  * @param[in] pool  thread pool object
302  * @param[in] threads  number of threads to spawn
303  *
304  * @return operation status
305  *    @retval 0 success
306  *    @retval AFS_TP_ERROR thread pool has already been started
307  */
308 int
309 afs_tp_set_threads(struct afs_thread_pool *pool,
310                    afs_uint32 threads)
311 {
312     int ret = 0;
313
314     assert(pthread_mutex_lock(&pool->lock) == 0);
315     if (pool->state != AFS_TP_STATE_INIT) {
316         ret = AFS_TP_ERROR;
317     } else {
318         pool->max_threads = threads;
319     }
320     assert(pthread_mutex_unlock(&pool->lock) == 0);
321
322     return ret;
323 }
324
325 /**
326  * set a custom thread entry point.
327  *
328  * @param[in] pool  thread pool object
329  * @param[in] entry thread entry function pointer
330  * @param[in] rock  opaque pointer passed to thread
331  *
332  * @return operation status
333  *    @retval 0 success
334  *    @retval AFS_TP_ERROR thread pool has already been started
335  */
336 int
337 afs_tp_set_entry(struct afs_thread_pool * pool,
338                  afs_tp_worker_func_t * entry,
339                  void * rock)
340 {
341     int ret = 0;
342
343     assert(pthread_mutex_lock(&pool->lock) == 0);
344     if (pool->state != AFS_TP_STATE_INIT) {
345         ret = AFS_TP_ERROR;
346     } else {
347         pool->entry = entry;
348         pool->rock = rock;
349     }
350     assert(pthread_mutex_unlock(&pool->lock) == 0);
351
352     return ret;
353 }
354
355 /**
356  * start a thread pool.
357  *
358  * @param[in] pool  thread pool object
359  *
360  * @return operation status
361  *    @retval 0 success
362  *    @retval AFS_TP_ERROR thread create failure
363  */
364 int
365 afs_tp_start(struct afs_thread_pool * pool)
366 {
367     int code, ret = 0;
368     struct afs_thread_pool_worker * worker;
369     afs_uint32 i;
370
371     assert(pthread_mutex_lock(&pool->lock) == 0);
372     if (pool->state != AFS_TP_STATE_INIT) {
373         ret = AFS_TP_ERROR;
374         goto done_sync;
375     }
376     pool->state = AFS_TP_STATE_STARTING;
377     assert(pthread_mutex_unlock(&pool->lock) == 0);
378
379     for (i = 0; i < pool->max_threads; i++) {
380         code = _afs_tp_worker_start(pool, &worker);
381         if (code) {
382             ret = code;
383         }
384     }
385
386     assert(pthread_mutex_lock(&pool->lock) == 0);
387     pool->state = AFS_TP_STATE_RUNNING;
388  done_sync:
389     assert(pthread_mutex_unlock(&pool->lock) == 0);
390
391     return ret;
392 }
393
394 /**
395  * shut down all threads in pool.
396  *
397  * @param[in] pool  thread pool object
398  * @param[in] block wait for all threads to terminate, if asserted
399  *
400  * @return operation status
401  *    @retval 0 success
402  */
403 int
404 afs_tp_shutdown(struct afs_thread_pool * pool,
405                 int block)
406 {
407     int ret = 0;
408     struct afs_thread_pool_worker * worker, *nn;
409
410     assert(pthread_mutex_lock(&pool->lock) == 0);
411     if (pool->state == AFS_TP_STATE_STOPPED
412         || pool->state == AFS_TP_STATE_STOPPING) {
413         goto done_stopped;
414     }
415     if (pool->state != AFS_TP_STATE_RUNNING) {
416         ret = AFS_TP_ERROR;
417         goto done_sync;
418     }
419     pool->state = AFS_TP_STATE_STOPPING;
420
421     for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) {
422         worker->req_shutdown = 1;
423     }
424     if (!pool->nthreads) {
425         pool->state = AFS_TP_STATE_STOPPED;
426     }
427     /* need to drop lock to get a membar here */
428     assert(pthread_mutex_unlock(&pool->lock) == 0);
429
430     ret = afs_wq_shutdown(pool->work_queue);
431     if (ret) {
432         goto error;
433     }
434
435     assert(pthread_mutex_lock(&pool->lock) == 0);
436  done_stopped:
437     if (block) {
438         while (pool->nthreads) {
439             assert(pthread_cond_wait(&pool->shutdown_cv,
440                                      &pool->lock) == 0);
441         }
442     }
443  done_sync:
444     assert(pthread_mutex_unlock(&pool->lock) == 0);
445
446  error:
447     return ret;
448 }
449
450 /**
451  * check whether thread pool is online.
452  *
453  * @param[in] pool  thread pool object
454  *
455  * @return whether pool is online
456  *    @retval 1 pool is online
457  *    @retval 0 pool is not online
458  */
459 int
460 afs_tp_is_online(struct afs_thread_pool * pool)
461 {
462     int ret;
463
464     assert(pthread_mutex_lock(&pool->lock) == 0);
465     ret = (pool->state == AFS_TP_STATE_RUNNING);
466     assert(pthread_mutex_unlock(&pool->lock) == 0);
467
468     return ret;
469 }
470
471 /**
472  * check whether a given worker thread can continue to run.
473  *
474  * @param[in] worker  worker thread object pointer
475  *
476  * @return whether thread can continue to execute
477  *    @retval 1 execution can continue
478  *    @retval 0 shutdown has been requested
479  */
480 int
481 afs_tp_worker_continue(struct afs_thread_pool_worker * worker)
482 {
483     return !worker->req_shutdown;
484 }