util: Handle serverLogMutex lock across forks
[openafs.git] / src / util / thread_pool.c
1 /*
2  * Copyright 2008-2010, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <afs/opr.h>
15
16 #include <lock.h>
17 #include <afs/afsutil.h>
18 #include <lwp.h>
19 #include <afs/afsint.h>
20
21 #define __AFS_THREAD_POOL_IMPL 1
22 #include "work_queue.h"
23 #include "thread_pool.h"
24 #include "thread_pool_impl.h"
25
26 /**
27  * public interfaces for thread_pool.
28  */
29
30 /**
31  * allocate a thread pool object.
32  *
33  * @param[inout] pool_out address in which to store pool object pointer
34  *
35  * @return operation status
36  *    @retval 0 success
37  *    @retval ENOMEM out of memory
38  *
39  * @internal
40  */
41 static int
42 _afs_tp_alloc(struct afs_thread_pool ** pool_out)
43 {
44     int ret = 0;
45     struct afs_thread_pool * pool;
46
47     *pool_out = pool = malloc(sizeof(*pool));
48     if (pool == NULL) {
49         ret = ENOMEM;
50         goto error;
51     }
52
53  error:
54     return ret;
55 }
56
57 /**
58  * free a thread pool object.
59  *
60  * @param[in] pool  thread pool object
61  *
62  * @return operation status
63  *    @retval 0 success
64  *
65  * @internal
66  */
67 static int
68 _afs_tp_free(struct afs_thread_pool * pool)
69 {
70     int ret = 0;
71
72     free(pool);
73
74     return ret;
75 }
76
77 /**
78  * allocate a thread worker object.
79  *
80  * @param[inout] worker_out address in which to store worker object pointer
81  *
82  * @return operation status
83  *    @retval 0 success
84  *    @retval ENOMEM out of memory
85  *
86  * @internal
87  */
88 static int
89 _afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out)
90 {
91     int ret = 0;
92     struct afs_thread_pool_worker * worker;
93
94     *worker_out = worker = malloc(sizeof(*worker));
95     if (worker == NULL) {
96         ret = ENOMEM;
97         goto error;
98     }
99
100     queue_NodeInit(&worker->worker_list);
101
102  error:
103     return ret;
104 }
105
106 /**
107  * free a thread worker object.
108  *
109  * @param[in] worker  thread worker object
110  *
111  * @return operation status
112  *    @retval 0 success
113  *
114  * @internal
115  */
116 static int
117 _afs_tp_worker_free(struct afs_thread_pool_worker * worker)
118 {
119     int ret = 0;
120
121     free(worker);
122
123     return ret;
124 }
125
126 /**
127  * low-level thread entry point.
128  *
129  * @param[in] rock opaque pointer to thread worker object
130  *
131  * @return opaque return pointer from pool entry function
132  *
133  * @internal
134  */
135 static void *
136 _afs_tp_worker_run(void * rock)
137 {
138     struct afs_thread_pool_worker * worker = rock;
139     struct afs_thread_pool * pool = worker->pool;
140
141     /* register worker with pool */
142     opr_mutex_enter(&pool->lock);
143     queue_Append(&pool->thread_list, worker);
144     pool->nthreads++;
145     opr_mutex_exit(&pool->lock);
146
147     /* call high-level entry point */
148     worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock);
149
150     /* adjust pool live thread count */
151     opr_mutex_enter(&pool->lock);
152     opr_Assert(pool->nthreads);
153     queue_Remove(worker);
154     pool->nthreads--;
155     if (!pool->nthreads) {
156         opr_cv_broadcast(&pool->shutdown_cv);
157         pool->state = AFS_TP_STATE_STOPPED;
158     }
159     opr_mutex_exit(&pool->lock);
160
161     _afs_tp_worker_free(worker);
162
163     return NULL;
164 }
165
166 /**
167  * default high-level thread entry point.
168  *
169  * @internal
170  */
171 static void *
172 _afs_tp_worker_default(struct afs_thread_pool *pool,
173                        struct afs_thread_pool_worker *worker,
174                        struct afs_work_queue *queue,
175                        void *rock)
176 {
177     int code = 0;
178     while (code == 0 && afs_tp_worker_continue(worker)) {
179         code = afs_wq_do(queue, NULL /* no call rock */);
180     }
181
182     return NULL;
183 }
184
185 /**
186  * start a worker thread.
187  *
188  * @param[in] pool         thread pool object
189  * @param[inout] worker_out  address in which to store worker thread object pointer
190  *
191  * @return operation status
192  *    @retval 0 success
193  *    @retval ENOMEM out of memory
194  */
195 static int
196 _afs_tp_worker_start(struct afs_thread_pool * pool,
197                      struct afs_thread_pool_worker ** worker_out)
198 {
199     int ret = 0;
200     pthread_attr_t attrs;
201     struct afs_thread_pool_worker * worker;
202
203     ret = _afs_tp_worker_alloc(worker_out);
204     if (ret) {
205         goto error;
206     }
207     worker = *worker_out;
208
209     worker->pool = pool;
210     worker->req_shutdown = 0;
211
212     opr_Verify(pthread_attr_init(&attrs) == 0);
213     opr_Verify(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
214
215     ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker);
216
217  error:
218     return ret;
219 }
220
221 /**
222  * create a thread pool.
223  *
224  * @param[inout] pool_out  address in which to store pool object pointer.
225  * @param[in]    queue     work queue serviced by thread pool
226  *
227  * @return operation status
228  *    @retval 0 success
229  *    @retval ENOMEM out of memory
230  */
231 int
232 afs_tp_create(struct afs_thread_pool ** pool_out,
233               struct afs_work_queue * queue)
234 {
235     int ret = 0;
236     struct afs_thread_pool * pool;
237
238     ret = _afs_tp_alloc(pool_out);
239     if (ret) {
240         goto error;
241     }
242     pool = *pool_out;
243
244     opr_mutex_init(&pool->lock);
245     opr_cv_init(&pool->shutdown_cv);
246     queue_Init(&pool->thread_list);
247     pool->work_queue = queue;
248     pool->entry = &_afs_tp_worker_default;
249     pool->rock = NULL;
250     pool->nthreads = 0;
251     pool->max_threads = 4;
252     pool->state = AFS_TP_STATE_INIT;
253
254  error:
255     return ret;
256 }
257
258 /**
259  * destroy a thread pool.
260  *
261  * @param[in] pool thread pool object to be destroyed
262  *
263  * @return operation status
264  *    @retval 0 success
265  *    @retval AFS_TP_ERROR pool not in a quiescent state
266  */
267 int
268 afs_tp_destroy(struct afs_thread_pool * pool)
269 {
270     int ret = 0;
271
272     opr_mutex_enter(&pool->lock);
273     switch (pool->state) {
274     case AFS_TP_STATE_INIT:
275     case AFS_TP_STATE_STOPPED:
276         _afs_tp_free(pool);
277         break;
278
279     default:
280         ret = AFS_TP_ERROR;
281         opr_mutex_exit(&pool->lock);
282     }
283
284     return ret;
285 }
286
287 /**
288  * set the number of threads to spawn.
289  *
290  * @param[in] pool  thread pool object
291  * @param[in] threads  number of threads to spawn
292  *
293  * @return operation status
294  *    @retval 0 success
295  *    @retval AFS_TP_ERROR thread pool has already been started
296  */
297 int
298 afs_tp_set_threads(struct afs_thread_pool *pool,
299                    afs_uint32 threads)
300 {
301     int ret = 0;
302
303     opr_mutex_enter(&pool->lock);
304     if (pool->state != AFS_TP_STATE_INIT) {
305         ret = AFS_TP_ERROR;
306     } else {
307         pool->max_threads = threads;
308     }
309     opr_mutex_exit(&pool->lock);
310
311     return ret;
312 }
313
314 /**
315  * set a custom thread entry point.
316  *
317  * @param[in] pool  thread pool object
318  * @param[in] entry thread entry function pointer
319  * @param[in] rock  opaque pointer passed to thread
320  *
321  * @return operation status
322  *    @retval 0 success
323  *    @retval AFS_TP_ERROR thread pool has already been started
324  */
325 int
326 afs_tp_set_entry(struct afs_thread_pool * pool,
327                  afs_tp_worker_func_t * entry,
328                  void * rock)
329 {
330     int ret = 0;
331
332     opr_mutex_enter(&pool->lock);
333     if (pool->state != AFS_TP_STATE_INIT) {
334         ret = AFS_TP_ERROR;
335     } else {
336         pool->entry = entry;
337         pool->rock = rock;
338     }
339     opr_mutex_exit(&pool->lock);
340
341     return ret;
342 }
343
344 /**
345  * start a thread pool.
346  *
347  * @param[in] pool  thread pool object
348  *
349  * @return operation status
350  *    @retval 0 success
351  *    @retval AFS_TP_ERROR thread create failure
352  */
353 int
354 afs_tp_start(struct afs_thread_pool * pool)
355 {
356     int code, ret = 0;
357     struct afs_thread_pool_worker * worker;
358     afs_uint32 i;
359
360     opr_mutex_enter(&pool->lock);
361     if (pool->state != AFS_TP_STATE_INIT) {
362         ret = AFS_TP_ERROR;
363         goto done_sync;
364     }
365     pool->state = AFS_TP_STATE_STARTING;
366     opr_mutex_exit(&pool->lock);
367
368     for (i = 0; i < pool->max_threads; i++) {
369         code = _afs_tp_worker_start(pool, &worker);
370         if (code) {
371             ret = code;
372         }
373     }
374
375     opr_mutex_enter(&pool->lock);
376     pool->state = AFS_TP_STATE_RUNNING;
377  done_sync:
378     opr_mutex_exit(&pool->lock);
379
380     return ret;
381 }
382
383 /**
384  * shut down all threads in pool.
385  *
386  * @param[in] pool  thread pool object
387  * @param[in] block wait for all threads to terminate, if asserted
388  *
389  * @return operation status
390  *    @retval 0 success
391  */
392 int
393 afs_tp_shutdown(struct afs_thread_pool * pool,
394                 int block)
395 {
396     int ret = 0;
397     struct afs_thread_pool_worker * worker, *nn;
398
399     opr_mutex_enter(&pool->lock);
400     if (pool->state == AFS_TP_STATE_STOPPED
401         || pool->state == AFS_TP_STATE_STOPPING) {
402         goto done_stopped;
403     }
404     if (pool->state != AFS_TP_STATE_RUNNING) {
405         ret = AFS_TP_ERROR;
406         goto done_sync;
407     }
408     pool->state = AFS_TP_STATE_STOPPING;
409
410     for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) {
411         worker->req_shutdown = 1;
412     }
413     if (!pool->nthreads) {
414         pool->state = AFS_TP_STATE_STOPPED;
415     }
416     /* need to drop lock to get a membar here */
417     opr_mutex_exit(&pool->lock);
418
419     ret = afs_wq_shutdown(pool->work_queue);
420     if (ret) {
421         goto error;
422     }
423
424     opr_mutex_enter(&pool->lock);
425  done_stopped:
426     if (block) {
427         while (pool->nthreads) {
428             opr_cv_wait(&pool->shutdown_cv, &pool->lock);
429         }
430     }
431  done_sync:
432     opr_mutex_exit(&pool->lock);
433
434  error:
435     return ret;
436 }
437
438 /**
439  * check whether thread pool is online.
440  *
441  * @param[in] pool  thread pool object
442  *
443  * @return whether pool is online
444  *    @retval 1 pool is online
445  *    @retval 0 pool is not online
446  */
447 int
448 afs_tp_is_online(struct afs_thread_pool * pool)
449 {
450     int ret;
451
452     opr_mutex_enter(&pool->lock);
453     ret = (pool->state == AFS_TP_STATE_RUNNING);
454     opr_mutex_exit(&pool->lock);
455
456     return ret;
457 }
458
459 /**
460  * check whether a given worker thread can continue to run.
461  *
462  * @param[in] worker  worker thread object pointer
463  *
464  * @return whether thread can continue to execute
465  *    @retval 1 execution can continue
466  *    @retval 0 shutdown has been requested
467  */
468 int
469 afs_tp_worker_continue(struct afs_thread_pool_worker * worker)
470 {
471     return !worker->req_shutdown;
472 }