Provide an abstract thread pool object
authorAndrew Deason <adeason@sinenomine.net>
Thu, 11 Mar 2010 16:43:54 +0000 (10:43 -0600)
committerDerrick Brashear <shadow@dementia.org>
Sat, 2 Oct 2010 04:11:15 +0000 (21:11 -0700)
Add some routines to maintain a pool of threads, for working through a
Vwork_queue.

This adds the afs_tp* series of functions. Originally written by
Tom Keiser.

Change-Id: I8735aa14ca6622ae0eca7a7589e69b0f3b3daf08
Reviewed-on: http://gerrit.openafs.org/1863
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@dementia.org>

src/util/Makefile.in
src/util/thread_pool.c [new file with mode: 0644]
src/util/thread_pool.h [new file with mode: 0644]
src/util/thread_pool_impl.h [new file with mode: 0644]
src/util/thread_pool_impl_types.h [new file with mode: 0644]
src/util/thread_pool_types.h [new file with mode: 0644]

index 1817811..4a81136 100644 (file)
@@ -40,6 +40,8 @@ includes = \
        ${TOP_INCDIR}/afs/softsig.h \
        ${TOP_INCDIR}/afs/work_queue.h \
        ${TOP_INCDIR}/afs/work_queue_types.h \
+       ${TOP_INCDIR}/afs/thread_pool.h \
+       ${TOP_INCDIR}/afs/thread_pool_types.h \
        ${TOP_INCDIR}/potpourri.h 
 
 all: ${includes} \
@@ -106,6 +108,12 @@ ${TOP_INCDIR}/afs/work_queue.h: ${srcdir}/work_queue.h
 ${TOP_INCDIR}/afs/work_queue_types.h: ${srcdir}/work_queue_types.h
        ${INSTALL_DATA} $? $@
 
+${TOP_INCDIR}/afs/thread_pool.h: ${srcdir}/thread_pool.h
+       ${INSTALL_DATA} $? $@
+
+${TOP_INCDIR}/afs/thread_pool_types.h: ${srcdir}/thread_pool_types.h
+       ${INSTALL_DATA} $? $@
+
 ${TOP_INCDIR}/potpourri.h: ${srcdir}/potpourri.h
        ${INSTALL_DATA} $? $@
 
@@ -255,6 +263,8 @@ install: dirpath.h util.a sys
        ${INSTALL_DATA} ${srcdir}/softsig.h ${DESTDIR}${includedir}/afs/softsig.h
        ${INSTALL_DATA} ${srcdir}/work_queue.h ${DESTDIR}${includedir}/afs/work_queue.h
        ${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DESTDIR}${includedir}/afs/work_queue_types.h
+       ${INSTALL_DATA} ${srcdir}/thread_pool.h ${DESTDIR}${includedir}/afs/thread_pool.h
+       ${INSTALL_DATA} ${srcdir}/thread_pool_types.h ${DESTDIR}${includedir}/afs/thread_pool_types.h
        ${INSTALL_DATA} ${srcdir}/potpourri.h ${DESTDIR}${includedir}/potpourri.h
        ${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/util.a
        ${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/libafsutil.a
@@ -282,6 +292,8 @@ dest: dirpath.h util.a sys
        ${INSTALL_DATA} ${srcdir}/softsig.h ${DEST}/include/afs/softsig.h
        ${INSTALL_DATA} ${srcdir}/work_queue.h ${DEST}/include/afs/work_queue.h
        ${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DEST}/include/afs/work_queue_types.h
+       ${INSTALL_DATA} ${srcdir}/thread_pool.h ${DEST}/include/afs/thread_pool.h
+       ${INSTALL_DATA} ${srcdir}/thread_pool_types.h ${DEST}/include/afs/thread_pool_types.h
        ${INSTALL_DATA} ${srcdir}/potpourri.h ${DEST}/include/potpourri.h
        ${INSTALL_DATA} util.a ${DEST}/lib/afs/util.a
        ${INSTALL_DATA} util.a ${DEST}/lib/afs/libafsutil.a
diff --git a/src/util/thread_pool.c b/src/util/thread_pool.c
new file mode 100644 (file)
index 0000000..5eed860
--- /dev/null
@@ -0,0 +1,484 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#include <afsconfig.h>
+#include <afs/param.h>
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <afs/assert.h>
+#include <string.h>
+#include <sys/file.h>
+#include <sys/param.h>
+#include <lock.h>
+#if defined(AFS_SUN5_ENV) || defined(AFS_HPUX_ENV)
+#include <unistd.h>
+#endif
+#include <afs/afsutil.h>
+#include <lwp.h>
+#include <afs/afsint.h>
+
+#define __AFS_THREAD_POOL_IMPL 1
+#include "work_queue.h"
+#include "thread_pool.h"
+#include "thread_pool_impl.h"
+
+/**
+ * public interfaces for thread_pool.
+ */
+
+/**
+ * allocate a thread pool object.
+ *
+ * @param[inout] pool_out address in which to store pool object pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM out of memory
+ *
+ * @internal
+ */
+static int
+_afs_tp_alloc(struct afs_thread_pool ** pool_out)
+{
+    int ret = 0;
+    struct afs_thread_pool * pool;
+
+    *pool_out = pool = malloc(sizeof(*pool));
+    if (pool == NULL) {
+       ret = ENOMEM;
+       goto error;
+    }
+
+ error:
+    return ret;
+}
+
+/**
+ * free a thread pool object.
+ *
+ * @param[in] pool  thread pool object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @internal
+ */
+static int
+_afs_tp_free(struct afs_thread_pool * pool)
+{
+    int ret = 0;
+
+    free(pool);
+
+    return ret;
+}
+
+/**
+ * allocate a thread worker object.
+ *
+ * @param[inout] worker_out address in which to store worker object pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM out of memory
+ *
+ * @internal
+ */
+static int
+_afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out)
+{
+    int ret = 0;
+    struct afs_thread_pool_worker * worker;
+
+    *worker_out = worker = malloc(sizeof(*worker));
+    if (worker == NULL) {
+       ret = ENOMEM;
+       goto error;
+    }
+
+    queue_NodeInit(&worker->worker_list);
+
+ error:
+    return ret;
+}
+
+/**
+ * free a thread worker object.
+ *
+ * @param[in] worker  thread worker object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @internal
+ */
+static int
+_afs_tp_worker_free(struct afs_thread_pool_worker * worker)
+{
+    int ret = 0;
+
+    free(worker);
+
+    return ret;
+}
+
+/**
+ * low-level thread entry point.
+ *
+ * @param[in] rock opaque pointer to thread worker object
+ *
+ * @return opaque return pointer from pool entry function
+ *
+ * @internal
+ */
+static void *
+_afs_tp_worker_run(void * rock)
+{
+    struct afs_thread_pool_worker * worker = rock;
+    struct afs_thread_pool * pool = worker->pool;
+
+    /* register worker with pool */
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    queue_Append(&pool->thread_list, worker);
+    pool->nthreads++;
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    /* call high-level entry point */
+    worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock);
+
+    /* adjust pool live thread count */
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    assert(pool->nthreads);
+    queue_Remove(worker);
+    pool->nthreads--;
+    if (!pool->nthreads) {
+       assert(pthread_cond_broadcast(&pool->shutdown_cv) == 0);
+       pool->state = AFS_TP_STATE_STOPPED;
+    }
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    _afs_tp_worker_free(worker);
+
+    return NULL;
+}
+
+/**
+ * default high-level thread entry point.
+ *
+ * @internal
+ */
+static void *
+_afs_tp_worker_default(struct afs_thread_pool *pool,
+                       struct afs_thread_pool_worker *worker,
+                       struct afs_work_queue *queue,
+                       void *rock)
+{
+    int code = 0;
+    while (code == 0 && afs_tp_worker_continue(worker)) {
+       code = afs_wq_do(queue, NULL /* no call rock */);
+    }
+
+    return NULL;
+}
+
+/**
+ * start a worker thread.
+ *
+ * @param[in] pool         thread pool object
+ * @param[inout] worker_out  address in which to store worker thread object pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM out of memory
+ */
+static int
+_afs_tp_worker_start(struct afs_thread_pool * pool,
+                    struct afs_thread_pool_worker ** worker_out)
+{
+    int ret = 0;
+    pthread_attr_t attrs;
+    struct afs_thread_pool_worker * worker;
+
+    ret = _afs_tp_worker_alloc(worker_out);
+    if (ret) {
+       goto error;
+    }
+    worker = *worker_out;
+
+    worker->pool = pool;
+    worker->req_shutdown = 0;
+
+    assert(pthread_attr_init(&attrs) == 0);
+    assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
+
+    ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker);
+
+ error:
+    return ret;
+}
+
+/**
+ * create a thread pool.
+ *
+ * @param[inout] pool_out  address in which to store pool object pointer.
+ * @param[in]    queue     work queue serviced by thread pool
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM out of memory
+ */
+int
+afs_tp_create(struct afs_thread_pool ** pool_out,
+             struct afs_work_queue * queue)
+{
+    int ret = 0;
+    struct afs_thread_pool * pool;
+
+    ret = _afs_tp_alloc(pool_out);
+    if (ret) {
+       goto error;
+    }
+    pool = *pool_out;
+
+    assert(pthread_mutex_init(&pool->lock, NULL) == 0);
+    assert(pthread_cond_init(&pool->shutdown_cv, NULL) == 0);
+    queue_Init(&pool->thread_list);
+    pool->work_queue = queue;
+    pool->entry = &_afs_tp_worker_default;
+    pool->rock = NULL;
+    pool->nthreads = 0;
+    pool->max_threads = 4;
+    pool->state = AFS_TP_STATE_INIT;
+
+ error:
+    return ret;
+}
+
+/**
+ * destroy a thread pool.
+ *
+ * @param[in] pool thread pool object to be destroyed
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_TP_ERROR pool not in a quiescent state
+ */
+int
+afs_tp_destroy(struct afs_thread_pool * pool)
+{
+    int ret = 0;
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    switch (pool->state) {
+    case AFS_TP_STATE_INIT:
+    case AFS_TP_STATE_STOPPED:
+       _afs_tp_free(pool);
+       break;
+
+    default:
+       ret = AFS_TP_ERROR;
+       assert(pthread_mutex_unlock(&pool->lock) == 0);
+    }
+
+    return ret;
+}
+
+/**
+ * set the number of threads to spawn.
+ *
+ * @param[in] pool  thread pool object
+ * @param[in] threads  number of threads to spawn
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_TP_ERROR thread pool has already been started
+ */
+int
+afs_tp_set_threads(struct afs_thread_pool *pool,
+                   afs_uint32 threads)
+{
+    int ret = 0;
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    if (pool->state != AFS_TP_STATE_INIT) {
+       ret = AFS_TP_ERROR;
+    } else {
+       pool->max_threads = threads;
+    }
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * set a custom thread entry point.
+ *
+ * @param[in] pool  thread pool object
+ * @param[in] entry thread entry function pointer
+ * @param[in] rock  opaque pointer passed to thread
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_TP_ERROR thread pool has already been started
+ */
+int
+afs_tp_set_entry(struct afs_thread_pool * pool,
+                afs_tp_worker_func_t * entry,
+                void * rock)
+{
+    int ret = 0;
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    if (pool->state != AFS_TP_STATE_INIT) {
+       ret = AFS_TP_ERROR;
+    } else {
+       pool->entry = entry;
+       pool->rock = rock;
+    }
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * start a thread pool.
+ *
+ * @param[in] pool  thread pool object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_TP_ERROR thread create failure
+ */
+int
+afs_tp_start(struct afs_thread_pool * pool)
+{
+    int code, ret = 0;
+    struct afs_thread_pool_worker * worker;
+    afs_uint32 i;
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    if (pool->state != AFS_TP_STATE_INIT) {
+       ret = AFS_TP_ERROR;
+       goto done_sync;
+    }
+    pool->state = AFS_TP_STATE_STARTING;
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    for (i = 0; i < pool->max_threads; i++) {
+       code = _afs_tp_worker_start(pool, &worker);
+       if (code) {
+           ret = code;
+       }
+    }
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    pool->state = AFS_TP_STATE_RUNNING;
+ done_sync:
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * shut down all threads in pool.
+ *
+ * @param[in] pool  thread pool object
+ * @param[in] block wait for all threads to terminate, if asserted
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_tp_shutdown(struct afs_thread_pool * pool,
+               int block)
+{
+    int ret = 0;
+    struct afs_thread_pool_worker * worker, *nn;
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    if (pool->state == AFS_TP_STATE_STOPPED
+        || pool->state == AFS_TP_STATE_STOPPING) {
+       goto done_stopped;
+    }
+    if (pool->state != AFS_TP_STATE_RUNNING) {
+       ret = AFS_TP_ERROR;
+       goto done_sync;
+    }
+    pool->state = AFS_TP_STATE_STOPPING;
+
+    for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) {
+       worker->req_shutdown = 1;
+    }
+    if (!pool->nthreads) {
+       pool->state = AFS_TP_STATE_STOPPED;
+    }
+    /* need to drop lock to get a membar here */
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    ret = afs_wq_shutdown(pool->work_queue);
+    if (ret) {
+       goto error;
+    }
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+ done_stopped:
+    if (block) {
+       while (pool->nthreads) {
+           assert(pthread_cond_wait(&pool->shutdown_cv,
+                                    &pool->lock) == 0);
+       }
+    }
+ done_sync:
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+ error:
+    return ret;
+}
+
+/**
+ * check whether thread pool is online.
+ *
+ * @param[in] pool  thread pool object
+ *
+ * @return whether pool is online
+ *    @retval 1 pool is online
+ *    @retval 0 pool is not online
+ */
+int
+afs_tp_is_online(struct afs_thread_pool * pool)
+{
+    int ret;
+
+    assert(pthread_mutex_lock(&pool->lock) == 0);
+    ret = (pool->state == AFS_TP_STATE_RUNNING);
+    assert(pthread_mutex_unlock(&pool->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * check whether a given worker thread can continue to run.
+ *
+ * @param[in] worker  worker thread object pointer
+ *
+ * @return whether thread can continue to execute
+ *    @retval 1 execution can continue
+ *    @retval 0 shutdown has been requested
+ */
+int
+afs_tp_worker_continue(struct afs_thread_pool_worker * worker)
+{
+    return !worker->req_shutdown;
+}
diff --git a/src/util/thread_pool.h b/src/util/thread_pool.h
new file mode 100644 (file)
index 0000000..dd25fd1
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_THREAD_POOL_H
+#define AFS_UTIL_THREAD_POOL_H 1
+
+#include "thread_pool_types.h"
+
+/**
+ * public interfaces for thread_pool.
+ */
+
+/* XXX move these into an et */
+#define AFS_TP_ERROR              -1 /**< fatal error in thread_pool package */
+
+extern int afs_tp_create(struct afs_thread_pool **,
+                        struct afs_work_queue *);
+extern int afs_tp_destroy(struct afs_thread_pool *);
+
+extern int afs_tp_set_threads(struct afs_thread_pool *, afs_uint32 threads);
+extern int afs_tp_set_entry(struct afs_thread_pool *,
+                           afs_tp_worker_func_t *,
+                           void * rock);
+
+extern int afs_tp_start(struct afs_thread_pool *);
+extern int afs_tp_shutdown(struct afs_thread_pool *,
+                          int block);
+
+extern int afs_tp_is_online(struct afs_thread_pool *);
+extern int afs_tp_worker_continue(struct afs_thread_pool_worker *);
+
+#endif /* AFS_UTIL_THREAD_POOL_H */
diff --git a/src/util/thread_pool_impl.h b/src/util/thread_pool_impl.h
new file mode 100644 (file)
index 0000000..dce8ee6
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_THREAD_POOL_IMPL_H
+#define AFS_UTIL_THREAD_POOL_IMPL_H 1
+
+#include "thread_pool.h"
+#include "thread_pool_impl_types.h"
+
+/**
+ *  implementation-private interfaces for thread_pool.
+ */
+
+#endif /* AFS_UTIL_THREAD_POOL_IMPL_H */
diff --git a/src/util/thread_pool_impl_types.h b/src/util/thread_pool_impl_types.h
new file mode 100644 (file)
index 0000000..2467c0d
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_THREAD_POOL_IMPL_TYPES_H
+#define AFS_UTIL_THREAD_POOL_IMPL_TYPES_H 1
+
+#ifndef __AFS_THREAD_POOL_IMPL
+#error "do not include this file outside of the thread pool implementation"
+#endif
+
+#include "thread_pool_types.h"
+#include <rx/rx_queue.h>
+
+/**
+ *
+ *  implementation-private type definitions for thread_pool.
+ */
+
+/**
+ * thread_pool worker state.
+ */
+typedef enum {
+    AFS_TP_STATE_INIT,            /**< initial state */
+    AFS_TP_STATE_STARTING,        /**< pool is starting up */
+    AFS_TP_STATE_RUNNING,         /**< pool is running normally */
+    AFS_TP_STATE_STOPPING,        /**< stop requested */
+    AFS_TP_STATE_STOPPED,         /**< pool is shut down */
+    /* add new states above this line */
+    AFS_TP_STATE_TERMINAL
+} afs_tp_state_t;
+
+/**
+ * thread_pool worker.
+ */
+struct afs_thread_pool_worker {
+    struct rx_queue worker_list;        /**< linked list of thread workers. */
+    struct afs_thread_pool * pool;               /**< associated thread pool */
+    void * ret;                         /**< return value from worker thread entry point */
+    pthread_t tid;                      /**< thread id */
+    int req_shutdown;                   /**< request shutdown of this thread */
+};
+
+/**
+ * thread pool.
+ */
+struct afs_thread_pool {
+    struct rx_queue thread_list;        /**< linked list of threads */
+    struct afs_work_queue * work_queue;         /**< work queue serviced by this thread pool. */
+    afs_tp_worker_func_t * entry;       /**< worker thread entry point */
+    void * rock;                        /**< opaque pointer passed to worker thread entry point */
+    afs_uint32 nthreads;                /**< current pool size */
+    afs_tp_state_t state;               /**< pool state */
+    afs_uint32 max_threads;             /**< pool options */
+    pthread_mutex_t lock;               /**< pool global state lock */
+    pthread_cond_t shutdown_cv;         /**< thread shutdown cv */
+};
+
+#endif /* AFS_UTIL_THREAD_POOL_IMPL_TYPES_H */
diff --git a/src/util/thread_pool_types.h b/src/util/thread_pool_types.h
new file mode 100644 (file)
index 0000000..4767624
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_THREAD_POOL_TYPES_H
+#define AFS_UTIL_THREAD_POOL_TYPES_H 1
+
+/**
+ *  public type definitions for thread_pool.
+ */
+
+/* forward declare opaque types */
+struct afs_thread_pool_worker;
+struct afs_thread_pool;
+struct afs_work_queue;
+
+/**
+ * thread_pool worker thread entry function.
+ *
+ * @param[in] pool    thread pool object pointer
+ * @param[in] worker  worker thread object pointer
+ * @param[in] queue   work queue object pointer
+ * @param[in] rock    opaque pointer
+ *
+ * @return opaque pointer
+ */
+typedef void * afs_tp_worker_func_t(struct afs_thread_pool * pool,
+                                   struct afs_thread_pool_worker * worker,
+                                   struct afs_work_queue * queue,
+                                   void * rock);
+
+#endif /* AFS_UTIL_THREAD_POOL_TYPES_H */