Provide an abstract work queue object
authorAndrew Deason <adeason@sinenomine.net>
Thu, 11 Mar 2010 16:39:56 +0000 (10:39 -0600)
committerDerrick Brashear <shadow@dementia.org>
Sat, 2 Oct 2010 04:10:58 +0000 (21:10 -0700)
Add some routines for specifying chunks of work to be done. The idea
is to be able to pass these to different threads, and specify
dependencies between them, wait on them completing, etc.

This adds the afs_wq* family of functions. Originally written by Tom
Keiser.

Change-Id: If556cf4da12de8c4be1e53376d85d791584ae177
Reviewed-on: http://gerrit.openafs.org/1862
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@dementia.org>

src/util/Makefile.in
src/util/work_queue.c [new file with mode: 0644]
src/util/work_queue.h [new file with mode: 0644]
src/util/work_queue_impl.h [new file with mode: 0644]
src/util/work_queue_impl_types.h [new file with mode: 0644]
src/util/work_queue_types.h [new file with mode: 0644]

index dfcc7d7..1817811 100644 (file)
@@ -38,6 +38,8 @@ includes = \
        ${TOP_INCDIR}/afs/afs_atomlist.h \
        ${TOP_INCDIR}/afs/afs_lhash.h \
        ${TOP_INCDIR}/afs/softsig.h \
+       ${TOP_INCDIR}/afs/work_queue.h \
+       ${TOP_INCDIR}/afs/work_queue_types.h \
        ${TOP_INCDIR}/potpourri.h 
 
 all: ${includes} \
@@ -98,6 +100,12 @@ ${TOP_INCDIR}/afs/afs_lhash.h: ${srcdir}/afs_lhash.h
 ${TOP_INCDIR}/afs/softsig.h: ${srcdir}/softsig.h
        ${INSTALL_DATA} $? $@
 
+${TOP_INCDIR}/afs/work_queue.h: ${srcdir}/work_queue.h
+       ${INSTALL_DATA} $? $@
+
+${TOP_INCDIR}/afs/work_queue_types.h: ${srcdir}/work_queue_types.h
+       ${INSTALL_DATA} $? $@
+
 ${TOP_INCDIR}/potpourri.h: ${srcdir}/potpourri.h
        ${INSTALL_DATA} $? $@
 
@@ -245,6 +253,8 @@ install: dirpath.h util.a sys
        ${INSTALL_DATA} ${srcdir}/afs_atomlist.h ${DESTDIR}${includedir}/afs/afs_atomlist.h
        ${INSTALL_DATA} ${srcdir}/afs_lhash.h ${DESTDIR}${includedir}/afs/afs_lhash.h
        ${INSTALL_DATA} ${srcdir}/softsig.h ${DESTDIR}${includedir}/afs/softsig.h
+       ${INSTALL_DATA} ${srcdir}/work_queue.h ${DESTDIR}${includedir}/afs/work_queue.h
+       ${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DESTDIR}${includedir}/afs/work_queue_types.h
        ${INSTALL_DATA} ${srcdir}/potpourri.h ${DESTDIR}${includedir}/potpourri.h
        ${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/util.a
        ${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/libafsutil.a
@@ -270,6 +280,8 @@ dest: dirpath.h util.a sys
        ${INSTALL_DATA} ${srcdir}/afs_atomlist.h ${DEST}/include/afs/afs_atomlist.h
        ${INSTALL_DATA} ${srcdir}/afs_lhash.h ${DEST}/include/afs/afs_lhash.h
        ${INSTALL_DATA} ${srcdir}/softsig.h ${DEST}/include/afs/softsig.h
+       ${INSTALL_DATA} ${srcdir}/work_queue.h ${DEST}/include/afs/work_queue.h
+       ${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DEST}/include/afs/work_queue_types.h
        ${INSTALL_DATA} ${srcdir}/potpourri.h ${DEST}/include/potpourri.h
        ${INSTALL_DATA} util.a ${DEST}/lib/afs/util.a
        ${INSTALL_DATA} util.a ${DEST}/lib/afs/libafsutil.a
diff --git a/src/util/work_queue.c b/src/util/work_queue.c
new file mode 100644 (file)
index 0000000..d2e1ea1
--- /dev/null
@@ -0,0 +1,1931 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#include <afsconfig.h>
+#include <afs/param.h>
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <afs/assert.h>
+#include <string.h>
+#include <sys/file.h>
+#include <sys/param.h>
+#include <lock.h>
+
+#define __AFS_WORK_QUEUE_IMPL 1
+#include "work_queue.h"
+#include "work_queue_impl.h"
+
+/**
+ * public interfaces for work_queue.
+ */
+
+static int
+_afs_wq_node_put_r(struct afs_work_queue_node * node,
+                  int drop);
+
+/**
+ * allocate a work queue object.
+ *
+ * @param[out] queue_out address in which to store queue pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM out of memory
+ *
+ * @internal
+ */
+static int
+_afs_wq_alloc(struct afs_work_queue ** queue_out)
+{
+    int ret = 0;
+    struct afs_work_queue * queue;
+
+    *queue_out = queue = malloc(sizeof(*queue));
+    if (queue == NULL) {
+       ret = ENOMEM;
+       goto error;
+    }
+
+ error:
+    return ret;
+}
+
+/**
+ * free a work queue object.
+ *
+ * @param[in] queue  work queue object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @internal
+ */
+static int
+_afs_wq_free(struct afs_work_queue * queue)
+{
+    int ret = 0;
+
+    free(queue);
+
+    return ret;
+}
+
+/**
+ * change a node's state.
+ *
+ * @param[in] node       node object
+ * @param[in] new_state  new object state
+ *
+ * @return old state
+ *
+ * @pre node->lock held
+ *
+ * @internal
+ */
+static afs_wq_work_state_t
+_afs_wq_node_state_change(struct afs_work_queue_node * node,
+                         afs_wq_work_state_t new_state)
+{
+    afs_wq_work_state_t old_state;
+
+    old_state = node->state;
+    node->state = new_state;
+
+    assert(pthread_cond_broadcast(&node->state_cv) == 0);
+
+    return old_state;
+}
+
+/**
+ * wait for a node's state to change from busy to something else.
+ *
+ * @param[in] node  node object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @pre node->lock held
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
+{
+    while (node->state == AFS_WQ_NODE_STATE_BUSY) {
+       assert(pthread_cond_wait(&node->state_cv,
+                                &node->lock) == 0);
+    }
+
+    return 0;
+}
+
+/**
+ * check whether a work queue node is busy.
+ *
+ * @param[in] node  node object pointer
+ *
+ * @return whether node is busy
+ *    @retval 1 node is busy
+ *    @retval 0 node is not busy
+ *
+ * @pre node->lock held
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
+{
+    return (node->state == AFS_WQ_NODE_STATE_BUSY);
+}
+
+/**
+ * attempt to simultaneously lock two work queue nodes.
+ *
+ * this is a somewhat tricky algorithm because there is no
+ * defined hierarchy within the work queue node population.
+ *
+ * @param[in] ml  multilock control structure
+ *
+ * @return operation status
+ *    @retval 0
+ *
+ * @note in theory, we could easily extend this to
+ *       lock more than two nodes
+ *
+ * @pre
+ *   - caller MUST NOT have set busy state on either node
+ *
+ * @post
+ *   - locks held on both nodes
+ *   - both nodes in quiescent states
+ *
+ * @note node with non-zero lock_held or busy_held fields
+ *       MUST go in array index 0
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
+{
+    int code, ret = 0;
+    struct timespec delay;
+    int first = 1, second = 0, tmp;
+
+    /* first pass processing */
+    if (ml->nodes[0].lock_held) {
+       if (!ml->nodes[0].busy_held) {
+           ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
+           if (ret) {
+               goto error;
+           }
+       }
+
+       code = pthread_mutex_trylock(&ml->nodes[1].node->lock);
+       if (!code) {
+           /* success */
+           goto done;
+       }
+
+       /* setup for main loop */
+       assert(pthread_mutex_unlock(&ml->nodes[0].node->lock) == 0);
+    }
+
+    /*
+     * setup random exponential backoff
+     *
+     * set initial delay to random value in the range [500,1000) ns
+     */
+    delay.tv_sec = 0;
+    delay.tv_nsec = 500 + rand() % 500;
+
+    while (1) {
+       assert(pthread_mutex_lock(&ml->nodes[first].node->lock) == 0);
+       if ((first != 0) || !ml->nodes[0].busy_held) {
+           ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
+           if (ret) {
+               /* cleanup */
+               if (!ml->nodes[0].lock_held || first) {
+                   assert(pthread_mutex_unlock(&ml->nodes[first].node->lock) == 0);
+                   if (ml->nodes[0].lock_held) {
+                       /* on error, return with locks in same state as before call */
+                       assert(pthread_mutex_lock(&ml->nodes[0].node->lock) == 0);
+                   }
+               }
+               goto error;
+           }
+       }
+
+       /*
+        * in order to avoid deadlock, we must use trylock and
+        * a non-blocking state check.  if we meet any contention,
+        * we must drop back and start again.
+        */
+       code = pthread_mutex_trylock(&ml->nodes[second].node->lock);
+       if (!code) {
+           if (((second == 0) && (ml->nodes[0].busy_held)) ||
+               !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
+               /* success */
+               break;
+           } else {
+               assert(pthread_mutex_unlock(&ml->nodes[second].node->lock) == 0);
+           }
+       }
+
+       /*
+        * contended.
+        *
+        * drop locks, use exponential backoff,
+        * try acquiring in the opposite order
+        */
+       assert(pthread_mutex_unlock(&ml->nodes[first].node->lock) == 0);
+       nanosleep(&delay, NULL);
+       if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
+           delay.tv_nsec <<= 1;
+       }
+       tmp = second;
+       second = first;
+       first = tmp;
+    }
+
+ done:
+ error:
+    return ret;
+}
+
+/**
+ * initialize a node list object.
+ *
+ * @param[in] list list object
+ * @param[in] id   list identifier
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_list_init(struct afs_work_queue_node_list * list,
+                      afs_wq_node_list_id_t id)
+{
+    queue_Init(&list->list);
+    assert(pthread_mutex_init(&list->lock, NULL) == 0);
+    assert(pthread_cond_init(&list->cv, NULL) == 0);
+    list->qidx = id;
+    list->shutdown = 0;
+
+    return 0;
+}
+
+/**
+ * destroy a node list object.
+ *
+ * @param[in] list list object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_WQ_ERROR list not empty
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
+{
+    int ret = 0;
+
+    if (queue_IsNotEmpty(&list->list)) {
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+
+    assert(pthread_mutex_destroy(&list->lock) == 0);
+    assert(pthread_cond_destroy(&list->cv) == 0);
+
+ error:
+    return ret;
+}
+
+/**
+ * wakeup all threads waiting in dequeue.
+ *
+ * @param[in] list list object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
+{
+    int ret = 0;
+    struct afs_work_queue_node *node, *nnode;
+
+    assert(pthread_mutex_lock(&list->lock) == 0);
+    list->shutdown = 1;
+
+    for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
+       _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
+       queue_Remove(node);
+       node->qidx = AFS_WQ_NODE_LIST_NONE;
+       node->queue = NULL;
+
+       if (node->detached) {
+           /* if we are detached, we hold the reference on the node;
+            * otherwise, it is some other caller that holds the reference.
+            * So don't put the node if we are not detached; the node will
+            * get freed when someone else calls afs_wq_node_put */
+           afs_wq_node_put(node);
+       }
+    }
+
+    assert(pthread_cond_broadcast(&list->cv) == 0);
+    assert(pthread_mutex_unlock(&list->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * append to a node list object.
+ *
+ * @param[in] list  list object
+ * @param[in] node  node object
+ * @param[in] state new node state
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_WQ_ERROR raced to enqueue node
+ *
+ * @pre
+ *   - node lock held
+ *   - node is not on a list
+ *   - node is either not busy, or it is marked as busy by the calling thread
+ *
+ * @post
+ *   - enqueued on list
+ *   - node lock dropped
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
+                         struct afs_work_queue_node * node,
+                         afs_wq_work_state_t state)
+{
+    int code, ret = 0;
+
+    if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
+       /* raced */
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+
+    /* deal with lock inversion */
+    code = pthread_mutex_trylock(&list->lock);
+    if (code) {
+       /* contended */
+       _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
+       assert(pthread_mutex_unlock(&node->lock) == 0);
+       assert(pthread_mutex_lock(&list->lock) == 0);
+       assert(pthread_mutex_lock(&node->lock) == 0);
+
+       /* assert state of the world (we set busy, so this should never happen) */
+       assert(queue_IsNotOnQueue(node));
+    }
+
+    if (list->shutdown) {
+       ret = AFS_WQ_ERROR;
+       goto error_unlock;
+    }
+
+    assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
+    if (queue_IsEmpty(&list->list)) {
+       /* wakeup a dequeue thread */
+       assert(pthread_cond_signal(&list->cv) == 0);
+    }
+    queue_Append(&list->list, node);
+    node->qidx = list->qidx;
+    _afs_wq_node_state_change(node, state);
+
+ error_unlock:
+    assert(pthread_mutex_unlock(&node->lock) == 0);
+    assert(pthread_mutex_unlock(&list->lock) == 0);
+
+ error:
+    return ret;
+}
+
+/**
+ * dequeue a node from a list object.
+ *
+ * @param[in]    list      list object
+ * @param[out]   node_out  address in which to store node object pointer
+ * @param[in]    state     new node state
+ * @param[in]    block     permit blocking on cv if asserted
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval EWOULDBLOCK block not asserted and nothing to dequeue
+ *    @retval EINTR blocking wait interrupted by list shutdown
+ *
+ * @post node object returned with node lock held and new state set
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
+                         struct afs_work_queue_node ** node_out,
+                         afs_wq_work_state_t state,
+                         int block)
+{
+    int ret = 0;
+    struct afs_work_queue_node * node;
+
+    assert(pthread_mutex_lock(&list->lock) == 0);
+
+    if (list->shutdown) {
+       *node_out = NULL;
+       ret = EINTR;
+       goto done_sync;
+    }
+
+    if (!block && queue_IsEmpty(&list->list)) {
+       *node_out = NULL;
+       ret = EWOULDBLOCK;
+       goto done_sync;
+    }
+
+    while (queue_IsEmpty(&list->list)) {
+       if (list->shutdown) {
+           *node_out = NULL;
+           ret = EINTR;
+           goto done_sync;
+       }
+       assert(pthread_cond_wait(&list->cv,
+                                &list->lock) == 0);
+    }
+
+    *node_out = node = queue_First(&list->list, afs_work_queue_node);
+
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    queue_Remove(node);
+    node->qidx = AFS_WQ_NODE_LIST_NONE;
+    _afs_wq_node_state_change(node, state);
+
+ done_sync:
+    assert(pthread_mutex_unlock(&list->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * remove a node from a list.
+ *
+ * @param[in] node        node object
+ * @param[in] next_state  node state following successful dequeue
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_WQ_ERROR in any of the following conditions:
+ *              - node not associated with a work queue
+ *              - node was not on a linked list (e.g. RUNNING state)
+ *              - we raced another thread
+ *
+ * @pre node->lock held
+ *
+ * @post node removed from node list
+ *
+ * @note node->lock may be dropped internally
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_list_remove(struct afs_work_queue_node * node,
+                        afs_wq_work_state_t next_state)
+{
+    int code, ret = 0;
+    struct afs_work_queue_node_list * list = NULL;
+
+    _afs_wq_node_state_wait_busy(node);
+
+    if (!node->queue) {
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+    switch (node->qidx) {
+    case AFS_WQ_NODE_LIST_READY:
+       list = &node->queue->ready_list;
+       break;
+
+    case AFS_WQ_NODE_LIST_BLOCKED:
+       list = &node->queue->blocked_list;
+       break;
+
+    case AFS_WQ_NODE_LIST_DONE:
+       list = &node->queue->done_list;
+       break;
+
+    default:
+       ret = AFS_WQ_ERROR;
+    }
+
+    if (list) {
+       code = pthread_mutex_trylock(&list->lock);
+       if (code) {
+           /* contended */
+           _afs_wq_node_state_change(node,
+                                          AFS_WQ_NODE_STATE_BUSY);
+           assert(pthread_mutex_unlock(&node->lock) == 0);
+           assert(pthread_mutex_lock(&list->lock) == 0);
+           assert(pthread_mutex_lock(&node->lock) == 0);
+
+           if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
+               /* raced */
+               ret= AFS_WQ_ERROR;
+               goto done_sync;
+           }
+       }
+
+       queue_Remove(node);
+       node->qidx = AFS_WQ_NODE_LIST_NONE;
+       _afs_wq_node_state_change(node, next_state);
+
+    done_sync:
+       assert(pthread_mutex_unlock(&list->lock) == 0);
+    }
+
+ error:
+    return ret;
+}
+
+/**
+ * allocate a dependency node.
+ *
+ * @param[out] node_out address in which to store dep node pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM   out of memory
+ *
+ * @internal
+ */
+static int
+_afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
+{
+    int ret = 0;
+    struct afs_work_queue_dep_node * node;
+
+    node = malloc(sizeof(*node));
+    if (node == NULL) {
+       ret = ENOMEM;
+       goto error;
+    }
+
+    queue_NodeInit(&node->parent_list);
+    node->parent = node->child = NULL;
+
+    *node_out = node;
+
+ error:
+    return ret;
+}
+
+/**
+ * free a dependency node.
+ *
+ * @param[in] node dep node pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_WQ_ERROR still attached to a work node
+ *
+ * @internal
+ */
+static int
+_afs_wq_dep_free(struct afs_work_queue_dep_node * node)
+{
+    int ret = 0;
+
+    if (queue_IsOnQueue(&node->parent_list) ||
+       node->parent ||
+       node->child) {
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+
+    free(node);
+
+ error:
+    return ret;
+}
+
+/**
+ * unlink work nodes from a dependency node.
+ *
+ * @param[in]  dep      dependency node
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @pre
+ *   - dep->parent and dep->child are either locked, or are not referenced
+ *     by anything else
+ *   - caller holds ref on dep->child
+ *   - dep->child and dep->parent in quiescent state
+ *
+ * @internal
+ */
+static int
+_afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
+{
+    struct afs_work_queue_node *child = dep->child;
+    queue_Remove(&dep->parent_list);
+    dep->child = NULL;
+    dep->parent = NULL;
+
+    return _afs_wq_node_put_r(child, 0);
+}
+
+/**
+ * get a reference to a work node.
+ *
+ * @param[in] node  work queue node
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @pre node->lock held
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_get_r(struct afs_work_queue_node * node)
+{
+    node->refcount++;
+
+    return 0;
+}
+
+/**
+ * unlink and free all of the dependency nodes from a node.
+ *
+ * @param[in] parent  work node that is the parent node of all deps to be freed
+ *
+ * @return operation status
+ *  @retval 0 success
+ *
+ * @pre parent->refcount == 0
+ */
+static int
+_afs_wq_node_free_deps(struct afs_work_queue_node *parent)
+{
+    int ret = 0, code;
+    struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
+    struct afs_work_queue_dep_node * dep, * nd;
+
+    /* unlink and free all of the dep structs attached to 'parent' */
+    for (queue_Scan(&parent->dep_children,
+                   dep,
+                   nd,
+                   afs_work_queue_dep_node)) {
+
+       assert(pthread_mutex_lock(&dep->child->lock) == 0);
+       node_unlock = dep->child;
+
+       /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
+        * put the last ref on the child, and we need the child to still exist
+        * so we can unlock it */
+       code = _afs_wq_node_get_r(dep->child);
+       if (code) {
+           goto loop_error;
+       }
+       node_put = dep->child;
+
+       /* remember, no need to lock dep->parent, since its refcount is 0 */
+       code = _afs_wq_dep_unlink_r(dep);
+
+     loop_error:
+       if (node_put) {
+           _afs_wq_node_put_r(node_put, 1);
+       } else if (node_unlock) {
+           assert(pthread_mutex_unlock(&node_unlock->lock) == 0);
+       }
+       node_put = node_unlock = NULL;
+
+       if (code == 0) {
+           /* Only do this if everything is okay; if code is nonzero,
+            * something will still be pointing at dep, so don't free it.
+            * We will leak memory, but that's better than memory corruption;
+            * we've done all we can do to try and free the dep memory */
+           code = _afs_wq_dep_free(dep);
+       }
+
+       if (!ret) {
+           ret = code;
+       }
+    }
+    return ret;
+}
+
+/**
+ * propagate state down through dep nodes.
+ *
+ * @param[in] parent      parent node object
+ * @param[in] next_state  next state parent will assume
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @pre
+ *   - parent->lock held
+ *
+ * @internal
+ */
+static int
+_afs_wq_dep_propagate(struct afs_work_queue_node * parent,
+                     afs_wq_work_state_t next_state)
+{
+    int ret = 0;
+    struct afs_work_queue_dep_node * dep, * nd;
+    struct afs_work_queue_node_multilock ml;
+    afs_wq_work_state_t old_state;
+    afs_wq_node_list_id_t qidx;
+    struct afs_work_queue_node_list * ql;
+    afs_wq_work_state_t cns;
+
+    old_state = _afs_wq_node_state_change(parent,
+                                              AFS_WQ_NODE_STATE_BUSY);
+    ml.nodes[0].node = parent;
+    ml.nodes[0].lock_held = 1;
+    ml.nodes[0].busy_held = 1;
+
+    /* scan through our children updating scheduling state */
+    for (queue_Scan(&parent->dep_children,
+                   dep,
+                   nd,
+                   afs_work_queue_dep_node)) {
+       /* skip half-registered nodes */
+       if (dep->child == NULL) {
+           continue;
+       }
+
+       ml.nodes[1].node = dep->child;
+       ml.nodes[1].lock_held = 0;
+       ml.nodes[1].busy_held = 0;
+       ret = _afs_wq_node_multilock(&ml);
+       if (ret) {
+           goto error;
+       }
+
+       switch (next_state) {
+       case AFS_WQ_NODE_STATE_DONE:
+           dep->child->block_count--;
+           break;
+
+       case AFS_WQ_NODE_STATE_ERROR:
+           dep->child->error_count++;
+           break;
+
+       default:
+           (void)0; /* nop */
+       }
+
+       /* skip unscheduled nodes */
+       if (dep->child->queue == NULL) {
+           assert(pthread_mutex_unlock(&dep->child->lock) == 0);
+           continue;
+       }
+
+       /*
+        * when blocked dep and error'd dep counts reach zero, the
+        * node can be scheduled for execution
+        */
+       if (dep->child->error_count) {
+           ql = &dep->child->queue->done_list;
+           qidx = AFS_WQ_NODE_LIST_DONE;
+           cns = AFS_WQ_NODE_STATE_ERROR;
+       } else if (dep->child->block_count) {
+           ql = &dep->child->queue->blocked_list;
+           qidx = AFS_WQ_NODE_LIST_BLOCKED;
+           cns = AFS_WQ_NODE_STATE_BLOCKED;
+       } else {
+           ql = &dep->child->queue->ready_list;
+           qidx = AFS_WQ_NODE_LIST_READY;
+           cns = AFS_WQ_NODE_STATE_SCHEDULED;
+       }
+
+       if (qidx != dep->child->qidx) {
+           /* we're transitioning to a different queue */
+           ret = _afs_wq_node_list_remove(dep->child,
+                                               AFS_WQ_NODE_STATE_BUSY);
+           if (ret) {
+               assert(pthread_mutex_unlock(&dep->child->lock) == 0);
+               goto error;
+           }
+
+           ret = _afs_wq_node_list_enqueue(ql,
+                                                dep->child,
+                                                cns);
+           if (ret) {
+               assert(pthread_mutex_unlock(&dep->child->lock) == 0);
+               goto error;
+           }
+       }
+       assert(pthread_mutex_unlock(&dep->child->lock) == 0);
+    }
+
+ error:
+    _afs_wq_node_state_change(parent,
+                                  old_state);
+    return ret;
+}
+
+/**
+ * decrements queue->running_count, and signals waiters if appropriate.
+ *
+ * @param[in] queue  queue to dec the running count of
+ */
+static void
+_afs_wq_dec_running_count(struct afs_work_queue *queue)
+{
+    assert(pthread_mutex_lock(&queue->lock) == 0);
+    queue->running_count--;
+    if (queue->shutdown && queue->running_count == 0) {
+       /* if we've shut down, someone may be waiting for the running count
+        * to drop to 0 */
+       assert(pthread_cond_broadcast(&queue->running_cv) == 0);
+    }
+    assert(pthread_mutex_unlock(&queue->lock) == 0);
+}
+
+/**
+ * execute a node on the queue.
+ *
+ * @param[in] queue  work queue
+ * @param[in] rock   opaque pointer (passed as third arg to callback func)
+ * @param[in] block  allow blocking in dequeue
+ *
+ * @return operation status
+ *    @retval 0 completed a work unit
+ *
+ * @internal
+ */
+static int
+_afs_wq_do(struct afs_work_queue * queue,
+          void * rock,
+          int block)
+{
+    int code, ret = 0;
+    struct afs_work_queue_node * node;
+    afs_wq_callback_func_t * cbf;
+    afs_wq_work_state_t next_state;
+    struct afs_work_queue_node_list * ql;
+    void * node_rock;
+    int detached = 0;
+
+    /* We can inc queue->running_count before actually pulling the node off
+     * of the ready_list, since running_count only really matters when we are
+     * shut down. If we get shut down before we pull the node off of
+     * ready_list, but after we inc'd running_count,
+     * _afs_wq_node_list_dequeue should return immediately with EINTR,
+     * in which case we'll dec running_count, so it's as if we never inc'd it
+     * in the first place. */
+    assert(pthread_mutex_lock(&queue->lock) == 0);
+    if (queue->shutdown) {
+       assert(pthread_mutex_unlock(&queue->lock) == 0);
+       return EINTR;
+    }
+    queue->running_count++;
+    assert(pthread_mutex_unlock(&queue->lock) == 0);
+
+    ret = _afs_wq_node_list_dequeue(&queue->ready_list,
+                                        &node,
+                                        AFS_WQ_NODE_STATE_RUNNING,
+                                        block);
+    if (ret) {
+       _afs_wq_dec_running_count(queue);
+       goto error;
+    }
+
+    cbf = node->cbf;
+    node_rock = node->rock;
+    detached = node->detached;
+
+    if (cbf != NULL) {
+       assert(pthread_mutex_unlock(&node->lock) == 0);
+       code = (*cbf)(queue, node, queue->rock, node_rock, rock);
+       assert(pthread_mutex_lock(&node->lock) == 0);
+       if (code == 0) {
+           next_state = AFS_WQ_NODE_STATE_DONE;
+           ql = &queue->done_list;
+       } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
+           if (node->error_count) {
+               next_state = AFS_WQ_NODE_STATE_ERROR;
+               ql = &queue->done_list;
+           } else if (node->block_count) {
+               next_state = AFS_WQ_NODE_STATE_BLOCKED;
+               ql = &queue->blocked_list;
+           } else {
+               next_state = AFS_WQ_NODE_STATE_SCHEDULED;
+               ql = &queue->ready_list;
+           }
+       } else {
+           next_state = AFS_WQ_NODE_STATE_ERROR;
+           ql = &queue->done_list;
+       }
+    } else {
+       next_state = AFS_WQ_NODE_STATE_DONE;
+       code = 0;
+       ql = &queue->done_list;
+    }
+
+    _afs_wq_dec_running_count(queue);
+
+    node->retcode = code;
+
+    if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
+        (next_state == AFS_WQ_NODE_STATE_ERROR)) {
+
+       assert(pthread_mutex_lock(&queue->lock) == 0);
+
+       if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
+           /* signal other threads if we're about to below the low
+            * pending-tasks threshold */
+           queue->drain = 0;
+           assert(pthread_cond_signal(&queue->pend_cv) == 0);
+       }
+
+       if (queue->pend_count == 1) {
+           /* signal other threads if we're about to become 'empty' */
+           assert(pthread_cond_broadcast(&queue->empty_cv) == 0);
+       }
+
+       queue->pend_count--;
+
+       assert(pthread_mutex_unlock(&queue->lock) == 0);
+    }
+
+    ret = _afs_wq_node_state_wait_busy(node);
+    if (ret) {
+       goto error;
+    }
+
+    /* propagate scheduling changes down through dependencies */
+    ret = _afs_wq_dep_propagate(node, next_state);
+    if (ret) {
+       goto error;
+    }
+
+    ret = _afs_wq_node_state_wait_busy(node);
+    if (ret) {
+       goto error;
+    }
+
+    if (detached &&
+       ((next_state == AFS_WQ_NODE_STATE_DONE) ||
+        (next_state == AFS_WQ_NODE_STATE_ERROR))) {
+       _afs_wq_node_state_change(node, next_state);
+       _afs_wq_node_put_r(node, 1);
+    } else {
+       ret = _afs_wq_node_list_enqueue(ql,
+                                            node,
+                                            next_state);
+    }
+
+ error:
+    return ret;
+}
+
+/**
+ * initialize a struct afs_work_queue_opts to the default values
+ *
+ * @param[out] opts  opts struct to initialize
+ */
+void
+afs_wq_opts_init(struct afs_work_queue_opts *opts)
+{
+    opts->pend_lothresh = 0;
+    opts->pend_hithresh = 0;
+}
+
+/**
+ * set the options for a struct afs_work_queue_opts appropriate for a certain
+ * number of threads.
+ *
+ * @param[out] opts   opts struct in which to set the values
+ * @param[in] threads number of threads
+ */
+void
+afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
+{
+    opts->pend_lothresh = threads * 2;
+    opts->pend_hithresh = threads * 16;
+
+    /* safety */
+    if (opts->pend_lothresh < 1) {
+       opts->pend_lothresh = 1;
+    }
+    if (opts->pend_hithresh < 2) {
+       opts->pend_hithresh = 2;
+    }
+}
+
+/**
+ * allocate and initialize a work queue object.
+ *
+ * @param[out]   queue_out  address in which to store newly allocated work queue object
+ * @param[in]    rock       work queue opaque pointer (passed as first arg to all fired callbacks)
+ * @param[in]    opts       options for the new created queue
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM         out of memory
+ */
+int
+afs_wq_create(struct afs_work_queue ** queue_out,
+             void * rock,
+              struct afs_work_queue_opts *opts)
+{
+    int ret = 0;
+    struct afs_work_queue * queue;
+
+    ret = _afs_wq_alloc(queue_out);
+    if (ret) {
+       goto error;
+    }
+    queue = *queue_out;
+
+    if (opts) {
+       memcpy(&queue->opts, opts, sizeof(queue->opts));
+    } else {
+       afs_wq_opts_init(&queue->opts);
+    }
+
+    _afs_wq_node_list_init(&queue->ready_list,
+                               AFS_WQ_NODE_LIST_READY);
+    _afs_wq_node_list_init(&queue->blocked_list,
+                               AFS_WQ_NODE_LIST_BLOCKED);
+    _afs_wq_node_list_init(&queue->done_list,
+                               AFS_WQ_NODE_LIST_DONE);
+    queue->rock = rock;
+    queue->drain = 0;
+    queue->shutdown = 0;
+    queue->pend_count = 0;
+    queue->running_count = 0;
+
+    assert(pthread_mutex_init(&queue->lock, NULL) == 0);
+    assert(pthread_cond_init(&queue->pend_cv, NULL) == 0);
+    assert(pthread_cond_init(&queue->empty_cv, NULL) == 0);
+    assert(pthread_cond_init(&queue->running_cv, NULL) == 0);
+
+ error:
+    return ret;
+}
+
+/**
+ * deallocate and free a work queue object.
+ *
+ * @param[in] queue  work queue to be destroyed
+ *
+ * @return operation status
+ *    @retval 0  success
+ *    @retval AFS_WQ_ERROR unspecified error
+ */
+int
+afs_wq_destroy(struct afs_work_queue * queue)
+{
+    int ret = 0;
+
+    ret = _afs_wq_node_list_destroy(&queue->ready_list);
+    if (ret) {
+       goto error;
+    }
+
+    ret = _afs_wq_node_list_destroy(&queue->blocked_list);
+    if (ret) {
+       goto error;
+    }
+
+    ret = _afs_wq_node_list_destroy(&queue->done_list);
+    if (ret) {
+       goto error;
+    }
+
+    ret = _afs_wq_free(queue);
+
+ error:
+    return ret;
+}
+
+/**
+ * shutdown a work queue.
+ *
+ * @param[in] queue work queue object pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_shutdown(struct afs_work_queue * queue)
+{
+    int ret = 0;
+
+    assert(pthread_mutex_lock(&queue->lock) == 0);
+    if (queue->shutdown) {
+       /* already shutdown, do nothing */
+       assert(pthread_mutex_unlock(&queue->lock) == 0);
+       goto error;
+    }
+    queue->shutdown = 1;
+
+    ret = _afs_wq_node_list_shutdown(&queue->ready_list);
+    if (ret) {
+       goto error;
+    }
+
+    ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
+    if (ret) {
+       goto error;
+    }
+
+    ret = _afs_wq_node_list_shutdown(&queue->done_list);
+    if (ret) {
+       goto error;
+    }
+
+    /* signal everyone that could be waiting, since these conditions will
+     * generally fail to signal on their own if we're shutdown, since no
+     * progress is being made */
+    assert(pthread_cond_broadcast(&queue->pend_cv) == 0);
+    assert(pthread_cond_broadcast(&queue->empty_cv) == 0);
+    assert(pthread_mutex_unlock(&queue->lock) == 0);
+
+ error:
+    return ret;
+}
+
+/**
+ * allocate a work node.
+ *
+ * @param[out] node_out  address in which to store new work node
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval ENOMEM         out of memory
+ */
+int
+afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
+{
+    int ret = 0;
+    struct afs_work_queue_node * node;
+
+    *node_out = node = (struct afs_work_queue_node *) malloc(sizeof(*node));
+    if (node == NULL) {
+       ret = ENOMEM;
+       goto error;
+    }
+
+    queue_NodeInit(&node->node_list);
+    node->qidx = AFS_WQ_NODE_LIST_NONE;
+    node->cbf = NULL;
+    node->rock = node->queue = NULL;
+    node->refcount = 1;
+    node->block_count = 0;
+    node->error_count = 0;
+    pthread_mutex_init(&node->lock, NULL);
+    pthread_cond_init(&node->state_cv, NULL);
+    node->state = AFS_WQ_NODE_STATE_INIT;
+    queue_Init(&node->dep_children);
+
+ error:
+    return ret;
+}
+
+/**
+ * free a work node.
+ *
+ * @param[in] node  work node object
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_free(struct afs_work_queue_node * node)
+{
+    int ret = 0;
+
+    if (queue_IsOnQueue(node) ||
+       (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
+       (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
+       (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+
+    ret = _afs_wq_node_free_deps(node);
+    if (ret) {
+       goto error;
+    }
+
+    pthread_mutex_destroy(&node->lock);
+    pthread_cond_destroy(&node->state_cv);
+
+    if (node->rock_dtor) {
+       (*node->rock_dtor) (node->rock);
+    }
+
+    free(node);
+
+ error:
+    return ret;
+}
+
+/**
+ * get a reference to a work node.
+ *
+ * @param[in] node  work queue node
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_node_get(struct afs_work_queue_node * node)
+{
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    node->refcount++;
+    assert(pthread_mutex_unlock(&node->lock) == 0);
+
+    return 0;
+}
+
+/**
+ * put back a reference to a work node.
+ *
+ * @param[in] node  work queue node
+ * @param[in] drop  drop node->lock
+ *
+ * @post if refcount reaches zero, node is deallocated.
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @pre node->lock held
+ *
+ * @internal
+ */
+static int
+_afs_wq_node_put_r(struct afs_work_queue_node * node,
+                  int drop)
+{
+    afs_uint32 refc;
+
+    assert(node->refcount > 0);
+    refc = --node->refcount;
+    if (drop) {
+       assert(pthread_mutex_unlock(&node->lock) == 0);
+    }
+    if (!refc) {
+       assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
+       _afs_wq_node_free(node);
+    }
+
+    return 0;
+}
+
+/**
+ * put back a reference to a work node.
+ *
+ * @param[in] node  work queue node
+ *
+ * @post if refcount reaches zero, node is deallocated.
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_node_put(struct afs_work_queue_node * node)
+{
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    return _afs_wq_node_put_r(node, 1);
+}
+
+/**
+ * set the callback function on a work node.
+ *
+ * @param[in] node  work queue node
+ * @param[in] cbf   callback function
+ * @param[in] rock  opaque pointer passed to callback
+ * @param[in] rock_dtor  destructor function for 'rock', or NULL
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_node_set_callback(struct afs_work_queue_node * node,
+                        afs_wq_callback_func_t * cbf,
+                        void * rock, afs_wq_callback_dtor_t *dtor)
+{
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    node->cbf = cbf;
+    node->rock = rock;
+    node->rock_dtor = dtor;
+    assert(pthread_mutex_unlock(&node->lock) == 0);
+
+    return 0;
+}
+
+/**
+ * detach work node.
+ *
+ * @param[in] node  work queue node
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_node_set_detached(struct afs_work_queue_node * node)
+{
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    node->detached = 1;
+    assert(pthread_mutex_unlock(&node->lock) == 0);
+
+    return 0;
+}
+
+/**
+ * link a dependency node to a parent and child work node.
+ *
+ * This links a dependency node such that when the 'parent' work node is
+ * done, the 'child' work node can proceed.
+ *
+ * @param[in]  dep      dependency node
+ * @param[in]  parent   parent node in this dependency
+ * @param[in]  child    child node in this dependency
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @pre
+ *   - parent->lock held
+ *   - child->lock held
+ *   - parent and child in quiescent state
+ *
+ * @internal
+ */
+static int
+_afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
+                   struct afs_work_queue_node *parent,
+                   struct afs_work_queue_node *child)
+{
+    int ret = 0;
+
+    /* Each dep node adds a ref to the child node of that dep. We do not
+     * do the same for the parent node, since if the only refs remaining
+     * for a node are deps in node->dep_children, then the node should be
+     * destroyed, and we will destroy the dep nodes when we free the
+     * work node. */
+    ret = _afs_wq_node_get_r(child);
+    if (ret) {
+       goto error;
+    }
+
+    /* add this dep node to the parent node's list of deps */
+    queue_Append(&parent->dep_children, &dep->parent_list);
+
+    dep->child = child;
+    dep->parent = parent;
+
+ error:
+    return ret;
+}
+
+/**
+ * add a dependency to a work node.
+ *
+ * @param[in] child  node which will be dependent upon completion of parent
+ * @param[in] parent node whose completion gates child's execution
+ *
+ * @pre
+ *   - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_node_dep_add(struct afs_work_queue_node * child,
+                   struct afs_work_queue_node * parent)
+{
+    int ret = 0;
+    struct afs_work_queue_dep_node * dep = NULL;
+    struct afs_work_queue_node_multilock ml;
+    int held = 0;
+
+    /* self references are bad, mmkay? */
+    if (parent == child) {
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+
+    ret = _afs_wq_dep_alloc(&dep);
+    if (ret) {
+       goto error;
+    }
+
+    memset(&ml, 0, sizeof(ml));
+    ml.nodes[0].node = parent;
+    ml.nodes[1].node = child;
+    ret = _afs_wq_node_multilock(&ml);
+    if (ret) {
+       goto error;
+    }
+    held = 1;
+
+    /* only allow dep modification while in initial state
+     * or running state (e.g. do a dep add while inside callback) */
+    if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
+       (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+
+    /* link dep node with child and parent work queue node */
+    ret = _afs_wq_dep_link_r(dep, parent, child);
+    if (ret) {
+       goto error;
+    }
+
+    /* handle blocking counts */
+    switch (parent->state) {
+    case AFS_WQ_NODE_STATE_INIT:
+    case AFS_WQ_NODE_STATE_SCHEDULED:
+    case AFS_WQ_NODE_STATE_RUNNING:
+    case AFS_WQ_NODE_STATE_BLOCKED:
+       child->block_count++;
+       break;
+
+    case AFS_WQ_NODE_STATE_ERROR:
+       child->error_count++;
+       break;
+
+    default:
+       (void)0; /* nop */
+    }
+
+ done:
+    if (held) {
+       assert(pthread_mutex_unlock(&child->lock) == 0);
+       assert(pthread_mutex_unlock(&parent->lock) == 0);
+    }
+    return ret;
+
+ error:
+    if (dep) {
+       _afs_wq_dep_free(dep);
+    }
+    goto done;
+}
+
+/**
+ * remove a dependency from a work node.
+ *
+ * @param[in] child  node which was dependent upon completion of parent
+ * @param[in] parent node whose completion gated child's execution
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_node_dep_del(struct afs_work_queue_node * child,
+                   struct afs_work_queue_node * parent)
+{
+    int code, ret = 0;
+    struct afs_work_queue_dep_node * dep, * ndep;
+    struct afs_work_queue_node_multilock ml;
+    int held = 0;
+
+    memset(&ml, 0, sizeof(ml));
+    ml.nodes[0].node = parent;
+    ml.nodes[1].node = child;
+    code = _afs_wq_node_multilock(&ml);
+    if (code) {
+       goto error;
+    }
+    held = 1;
+
+    /* only permit changes while child is in init state
+     * or running state (e.g. do a dep del when in callback func) */
+    if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
+       (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
+       ret = AFS_WQ_ERROR;
+       goto error;
+    }
+
+    /* locate node linking parent and child */
+    for (queue_Scan(&parent->dep_children,
+                   dep,
+                   ndep,
+                   afs_work_queue_dep_node)) {
+       if ((dep->child == child) &&
+           (dep->parent == parent)) {
+
+           /* no need to grab an extra ref on dep->child here; the caller
+            * should already have a ref on dep->child */
+           code = _afs_wq_dep_unlink_r(dep);
+           if (code) {
+               ret = code;
+               goto error;
+           }
+
+           code = _afs_wq_dep_free(dep);
+           if (code) {
+               ret = code;
+               goto error;
+           }
+           break;
+       }
+    }
+
+ error:
+    if (held) {
+       assert(pthread_mutex_unlock(&child->lock) == 0);
+       assert(pthread_mutex_unlock(&parent->lock) == 0);
+    }
+    return ret;
+}
+
+/**
+ * block a work node from execution.
+ *
+ * this can be used to allow external events to influence work queue flow.
+ *
+ * @param[in] node  work queue node to be blocked
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @post external block count incremented
+ */
+int
+afs_wq_node_block(struct afs_work_queue_node * node)
+{
+    int ret = 0;
+    int start;
+
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    ret = _afs_wq_node_state_wait_busy(node);
+    if (ret) {
+       goto error_sync;
+    }
+
+    start = node->block_count++;
+
+    if (!start &&
+       (node->qidx == AFS_WQ_NODE_LIST_READY)) {
+       /* unblocked->blocked transition, and we're already scheduled */
+       ret = _afs_wq_node_list_remove(node,
+                                           AFS_WQ_NODE_STATE_BUSY);
+       if (ret) {
+           goto error_sync;
+       }
+
+       ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
+                                            node,
+                                            AFS_WQ_NODE_STATE_BLOCKED);
+    }
+
+ error_sync:
+    assert(pthread_mutex_unlock(&node->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * unblock a work node for execution.
+ *
+ * this can be used to allow external events to influence work queue flow.
+ *
+ * @param[in] node  work queue node to be blocked
+ *
+ * @return operation status
+ *    @retval 0 success
+ *
+ * @post external block count decremented
+ */
+int
+afs_wq_node_unblock(struct afs_work_queue_node * node)
+{
+    int ret = 0;
+    int end;
+
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    ret = _afs_wq_node_state_wait_busy(node);
+    if (ret) {
+       goto error_sync;
+    }
+
+    end = --node->block_count;
+
+    if (!end &&
+       (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
+       /* blocked->unblock transition, and we're ready to be scheduled */
+       ret = _afs_wq_node_list_remove(node,
+                                           AFS_WQ_NODE_STATE_BUSY);
+       if (ret) {
+           goto error_sync;
+       }
+
+       ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
+                                            node,
+                                            AFS_WQ_NODE_STATE_SCHEDULED);
+    }
+
+ error_sync:
+    assert(pthread_mutex_unlock(&node->lock) == 0);
+
+    return ret;
+}
+
+/**
+ * initialize a afs_wq_add_opts struct with the default options.
+ *
+ * @param[out] opts  options structure to initialize
+ */
+void
+afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
+{
+    opts->donate = 0;
+    opts->block = 1;
+    opts->force = 0;
+}
+
+/**
+ * schedule a work node for execution.
+ *
+ * @param[in] queue  work queue
+ * @param[in] node   work node
+ * @param[in] opts   options for adding, or NULL for defaults
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval EWOULDBLOCK queue is full and opts specified not to block
+ *    @retval EINTR queue was full, we blocked to add, and the queue was
+ *                  shutdown while we were blocking
+ */
+int
+afs_wq_add(struct afs_work_queue *queue,
+           struct afs_work_queue_node *node,
+           struct afs_work_queue_add_opts *opts)
+{
+    int ret = 0;
+    int donate, block, force, hithresh;
+    struct afs_work_queue_node_list * list;
+    struct afs_work_queue_add_opts l_opts;
+    int waited_for_drain = 0;
+    afs_wq_work_state_t state;
+
+    if (!opts) {
+       afs_wq_add_opts_init(&l_opts);
+       opts = &l_opts;
+    }
+
+    donate = opts->donate;
+    block = opts->block;
+    force = opts->force;
+
+ retry:
+    assert(pthread_mutex_lock(&node->lock) == 0);
+
+    ret = _afs_wq_node_state_wait_busy(node);
+    if (ret) {
+       goto error;
+    }
+
+    if (!node->block_count && !node->error_count) {
+       list = &queue->ready_list;
+       state = AFS_WQ_NODE_STATE_SCHEDULED;
+    } else if (node->error_count) {
+       list = &queue->done_list;
+       state = AFS_WQ_NODE_STATE_ERROR;
+    } else {
+       list = &queue->blocked_list;
+       state = AFS_WQ_NODE_STATE_BLOCKED;
+    }
+
+    ret = 0;
+
+    assert(pthread_mutex_lock(&queue->lock) == 0);
+
+    if (queue->shutdown) {
+       ret = EINTR;
+       assert(pthread_mutex_unlock(&queue->lock) == 0);
+       assert(pthread_mutex_unlock(&node->lock) == 0);
+       goto error;
+    }
+
+    hithresh = queue->opts.pend_hithresh;
+    if (hithresh > 0 && queue->pend_count >= hithresh) {
+       queue->drain = 1;
+    }
+
+    if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
+                   || state == AFS_WQ_NODE_STATE_BLOCKED)) {
+
+       if (queue->drain) {
+           if (block) {
+
+               assert(pthread_mutex_unlock(&node->lock) == 0);
+
+               assert(pthread_cond_wait(&queue->pend_cv, &queue->lock) == 0);
+
+               if (queue->shutdown) {
+                   ret = EINTR;
+
+               } else {
+                   assert(pthread_mutex_unlock(&queue->lock) == 0);
+
+                   waited_for_drain = 1;
+
+                   goto retry;
+               }
+
+           } else {
+               ret = EWOULDBLOCK;
+           }
+       }
+
+    }
+
+    if (ret == 0) {
+       queue->pend_count++;
+    }
+    if (waited_for_drain) {
+       /* signal another thread that may have been waiting for drain */
+       assert(pthread_cond_signal(&queue->pend_cv) == 0);
+    }
+
+    assert(pthread_mutex_unlock(&queue->lock) == 0);
+
+    if (ret) {
+       goto error;
+    }
+
+    if (!donate)
+       node->refcount++;
+    node->queue = queue;
+
+    ret = _afs_wq_node_list_enqueue(list,
+                                        node,
+                                        state);
+ error:
+    return ret;
+}
+
+/**
+ * de-schedule a work node.
+ *
+ * @param[in] node  work node
+ *
+ * @return operation status
+ *    @retval 0 success
+ */
+int
+afs_wq_del(struct afs_work_queue_node * node)
+{
+    /* XXX todo */
+    return ENOTSUP;
+}
+
+/**
+ * execute a node on the queue.
+ *
+ * @param[in] queue  work queue
+ * @param[in] rock   opaque pointer (passed as third arg to callback func)
+ *
+ * @return operation status
+ *    @retval 0 completed a work unit
+ */
+int
+afs_wq_do(struct afs_work_queue * queue,
+         void * rock)
+{
+    return _afs_wq_do(queue, rock, 1);
+}
+
+/**
+ * execute a node on the queue, if there is any work to do.
+ *
+ * @param[in] queue  work queue
+ * @param[in] rock   opaque pointer (passed as third arg to callback func)
+ *
+ * @return operation status
+ *    @retval 0 completed a work unit
+ *    @retval EWOULDBLOCK there was nothing to do
+ */
+int
+afs_wq_do_nowait(struct afs_work_queue * queue,
+                void * rock)
+{
+    return _afs_wq_do(queue, rock, 0);
+}
+
+/**
+ * wait for all pending nodes to finish.
+ *
+ * @param[in] queue  work queue
+ *
+ * @return operation status
+ *   @retval 0 success
+ *
+ * @post the specified queue was empty at some point; it may not be empty by
+ * the time this function returns, but at some point after the function was
+ * called, there were no nodes in the ready queue or blocked queue.
+ */
+int
+afs_wq_wait_all(struct afs_work_queue *queue)
+{
+    int ret = 0;
+
+    assert(pthread_mutex_lock(&queue->lock) == 0);
+
+    while (queue->pend_count > 0 && !queue->shutdown) {
+       assert(pthread_cond_wait(&queue->empty_cv, &queue->lock) == 0);
+    }
+
+    if (queue->shutdown) {
+       /* queue has been shut down, but there may still be some threads
+        * running e.g. in the middle of their callback. ensure they have
+        * stopped before we return. */
+       while (queue->running_count > 0) {
+           assert(pthread_cond_wait(&queue->running_cv, &queue->lock) == 0);
+       }
+       ret = EINTR;
+       goto done;
+    }
+
+ done:
+    assert(pthread_mutex_unlock(&queue->lock) == 0);
+
+    /* technically this doesn't really guarantee that the work queue is empty
+     * after we return, but we do guarantee that it was empty at some point */
+
+    return ret;
+}
+
+/**
+ * wait for a node to complete; dequeue from done list.
+ *
+ * @param[in]  node     work queue node
+ * @param[out] retcode  return code from work unit
+ *
+ * @return operation status
+ *    @retval 0 sucess
+ *
+ * @pre ref held on node
+ */
+int
+afs_wq_node_wait(struct afs_work_queue_node * node,
+                int * retcode)
+{
+    int ret = 0;
+
+    assert(pthread_mutex_lock(&node->lock) == 0);
+    if (node->state == AFS_WQ_NODE_STATE_INIT) {
+       /* not sure what to do in this case */
+       goto done_sync;
+    }
+
+    while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
+          (node->state != AFS_WQ_NODE_STATE_ERROR)) {
+       assert(pthread_cond_wait(&node->state_cv,
+                                &node->lock) == 0);
+    }
+    if (retcode) {
+       *retcode = node->retcode;
+    }
+
+    if (node->queue == NULL) {
+       /* nothing we can do */
+       goto done_sync;
+    }
+
+    ret = _afs_wq_node_list_remove(node,
+                                       AFS_WQ_NODE_STATE_INIT);
+
+ done_sync:
+    assert(pthread_mutex_unlock(&node->lock) == 0);
+
+    return ret;
+}
diff --git a/src/util/work_queue.h b/src/util/work_queue.h
new file mode 100644 (file)
index 0000000..ebb5b77
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_WORK_QUEUE_H
+#define AFS_UTIL_WORK_QUEUE_H 1
+
+#include "work_queue_types.h"
+
+/**
+ * public interfaces for package work_queue.
+ */
+/** @defgroup afs_work_queue Volume Package Work Queue */
+/*@{*/
+
+/* XXX move these into an et */
+#define AFS_WQ_ERROR              -1 /**< fatal error in work_queue package */
+#define AFS_WQ_ERROR_RECOVERABLE  -2 /**< soft error in work_queue package */
+#define AFS_WQ_ERROR_RESCHEDULE   -3 /**< reschedule work node for execution */
+
+extern void afs_wq_opts_init(struct afs_work_queue_opts *);
+extern void afs_wq_opts_calc_thresh(struct afs_work_queue_opts *, int);
+extern int afs_wq_create(struct afs_work_queue **,
+                         void * rock,
+                         struct afs_work_queue_opts *);
+extern int afs_wq_destroy(struct afs_work_queue *);
+
+extern int afs_wq_shutdown(struct afs_work_queue *);
+
+extern int afs_wq_node_alloc(struct afs_work_queue_node **);
+extern int afs_wq_node_get(struct afs_work_queue_node *);
+extern int afs_wq_node_put(struct afs_work_queue_node *);
+extern int afs_wq_node_set_callback(struct afs_work_queue_node *,
+                                   afs_wq_callback_func_t *,
+                                   void * rock, afs_wq_callback_dtor_t *dtor);
+extern int afs_wq_node_set_detached(struct afs_work_queue_node *);
+extern int afs_wq_node_dep_add(struct afs_work_queue_node *,
+                              struct afs_work_queue_node *);
+extern int afs_wq_node_dep_del(struct afs_work_queue_node *,
+                              struct afs_work_queue_node *);
+extern int afs_wq_node_block(struct afs_work_queue_node *);
+extern int afs_wq_node_unblock(struct afs_work_queue_node *);
+extern void afs_wq_add_opts_init(struct afs_work_queue_add_opts *);
+extern int afs_wq_add(struct afs_work_queue *,
+                     struct afs_work_queue_node *,
+                     struct afs_work_queue_add_opts *);
+extern int afs_wq_del(struct afs_work_queue_node *);
+
+extern int afs_wq_do(struct afs_work_queue *,
+                    void * rock);
+extern int afs_wq_do_nowait(struct afs_work_queue *,
+                           void * rock);
+
+extern int afs_wq_wait_all(struct afs_work_queue *);
+extern int afs_wq_node_wait(struct afs_work_queue_node *,
+                                int * retcode);
+
+/*@}*/
+#endif /* AFS_UTIL_WORK_QUEUE_H */
diff --git a/src/util/work_queue_impl.h b/src/util/work_queue_impl.h
new file mode 100644 (file)
index 0000000..8fa2fde
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_WORK_QUEUE_IMPL_H
+#define AFS_UTIL_WORK_QUEUE_IMPL_H 1
+
+#include "work_queue.h"
+#include "work_queue_impl_types.h"
+
+/**
+ *  implementation-private interfaces for package work_queue.
+ */
+#endif /* AFS_UTIL_WORK_QUEUE_IMPL_H */
diff --git a/src/util/work_queue_impl_types.h b/src/util/work_queue_impl_types.h
new file mode 100644 (file)
index 0000000..327add0
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_WORK_QUEUE_IMPL_TYPES_H
+#define AFS_UTIL_WORK_QUEUE_IMPL_TYPES_H 1
+
+#ifndef __AFS_WORK_QUEUE_IMPL
+#error "do not include this file outside of the work queue implementation"
+#endif
+
+#include "work_queue_types.h"
+#include <rx/rx_queue.h>
+
+/**
+ *  implementation-private type definitions for work_queue.
+ */
+
+/**
+ * work_queue node state.
+ */
+typedef enum {
+    AFS_WQ_NODE_STATE_INIT,               /**< initial state */
+    AFS_WQ_NODE_STATE_SCHEDULED,          /**< scheduled for execution */
+    AFS_WQ_NODE_STATE_RUNNING,            /**< running callback function */
+    AFS_WQ_NODE_STATE_DONE,               /**< callback function finished */
+    AFS_WQ_NODE_STATE_ERROR,              /**< node callback failed, or some dep failed */
+    AFS_WQ_NODE_STATE_BLOCKED,            /**< pending some dependency */
+    AFS_WQ_NODE_STATE_BUSY,               /**< exclusively owned by a thread */
+    /* add new states above this line */
+    AFS_WQ_NODE_STATE_TERMINAL
+} afs_wq_work_state_t;
+
+/**
+ * work_queue dependency node.
+ */
+struct afs_work_queue_dep_node {
+    struct rx_queue parent_list;        /**< parent node's list of children */
+    struct afs_work_queue_node * parent;        /**< parent work node */
+    struct afs_work_queue_node * child;         /**< child work node */
+    /* coming soon: dep options */
+};
+
+/**
+ * work_queue enumeration.
+ *
+ * tells which linked list a given node is attached to.
+ */
+typedef enum {
+    AFS_WQ_NODE_LIST_NONE,         /**< node is not on a linked list. */
+    AFS_WQ_NODE_LIST_READY,        /**< node is on ready_list */
+    AFS_WQ_NODE_LIST_BLOCKED,      /**< node is on blocked_list */
+    AFS_WQ_NODE_LIST_DONE,         /**< node is on done_list */
+    /* add new queues above this line */
+    AFS_WQ_NODE_LIST_TERMINAL
+} afs_wq_node_list_id_t;
+
+/**
+ * work_queue node.
+ */
+struct afs_work_queue_node {
+    struct rx_queue node_list;          /**< linked list of work queue nodes. */
+    afs_wq_node_list_id_t qidx;         /**< id of linked list */
+    struct rx_queue dep_children;       /**< nodes whose execution depends upon
+                                        *   our completion. */
+    afs_wq_callback_func_t * cbf;
+                                        /**< callback function which will be called by scheduler */
+    afs_wq_callback_dtor_t *rock_dtor;  /**< destructor function for 'rock' */
+    void * rock;                        /**< opaque pointer passed into cbf */
+    struct afs_work_queue * queue;              /**< our queue */
+    afs_wq_work_state_t state;          /**< state of this queue node */
+    afs_uint32 refcount;                /**< object reference count */
+    afs_uint32 block_count;             /**< dependency blocking count; node is
+                                        *   only a candidate for execution when
+                                        *   this counter reaches zero. */
+    afs_uint32 error_count;             /**< dependency error count; node is only
+                                        *   a candidate for execution when this
+                                        *   counter reaches zero. */
+    int detached;                       /**< object is put instead of being placed onto done queue */
+    int retcode;                        /**< return code from worker function */
+    pthread_mutex_t lock;               /**< object lock */
+    pthread_cond_t state_cv;            /**< state change cv */
+};
+
+/**
+ * linked list
+ */
+struct afs_work_queue_node_list {
+    struct rx_queue list;               /**< linked list of nodes */
+    afs_wq_node_list_id_t qidx;         /**< id of linked list */
+    int shutdown;                       /**< don't allow blocking on dequeue if asserted */
+    pthread_mutex_t lock;               /**< synchronize list access */
+    pthread_cond_t  cv;                 /**< signal empty->non-empty transition */
+};
+
+/**
+ * multilock control structure.
+ */
+struct afs_work_queue_node_multilock {
+    struct {
+       struct afs_work_queue_node * node;
+       int lock_held;
+       int busy_held;
+    } nodes[2];
+};
+
+/**
+ * work queue.
+ */
+struct afs_work_queue {
+    struct afs_work_queue_node_list ready_list;     /**< ready work queue nodes. */
+    struct afs_work_queue_node_list blocked_list;   /**< nodes scheduled, but blocked */
+    struct afs_work_queue_node_list done_list;      /**< nodes done/errored */
+    void * rock;                            /**< opaque pointer passed to all callbacks */
+
+    struct afs_work_queue_opts opts;
+
+    int drain;                 /**< 1 if we are waiting for the queue to drain
+                                *   until the number of pending tasks has
+                                *   dropped below the low threshold */
+    int shutdown;              /**< 1 if the queue has been shutdown */
+    int pend_count;            /**< number of pending tasks */
+    int running_count;         /**< number of tasks busy running */
+    pthread_mutex_t lock;      /**< lock for the queue */
+    pthread_cond_t pend_cv;    /**< signalled when th queue is draining and
+                                *   the number of pending tasks dropped below
+                                *   the low threshold */
+    pthread_cond_t empty_cv;   /**< signalled when the number of pending tasks
+                                *   reaches 0 */
+    pthread_cond_t running_cv; /**< signalled once running_count reaches 0 and
+                                *   the queue is shutting down */
+};
+
+#endif /* AFS_UTIL_WORK_QUEUE_IMPL_TYPES_H */
diff --git a/src/util/work_queue_types.h b/src/util/work_queue_types.h
new file mode 100644 (file)
index 0000000..f712705
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2008-2010, Sine Nomine Associates and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
+
+#ifndef AFS_UTIL_WORK_QUEUE_TYPES_H
+#define AFS_UTIL_WORK_QUEUE_TYPES_H 1
+
+
+/**
+ *  public type definitions for work_queue.
+ */
+
+/* forward declare opaque types */
+struct afs_work_queue_node;
+struct afs_work_queue;
+
+/**
+ * options for creating a struct afs_work_queue.
+ *
+ * @see afs_wq_create
+ */
+struct afs_work_queue_opts {
+    unsigned int pend_hithresh; /**< maximum number of pending work nodes that
+                                 *   can be in the work queue before
+                                 *   non-forcing callers of afs_wq_add
+                                 *   will block */
+    unsigned int pend_lothresh; /**< when the number of pending work nodes
+                                 *   falls below this number, any blocked
+                                 *   callers of afs_wq_add are woken
+                                 *   back up */
+};
+
+/**
+ * options to afs_wq_add.
+ *
+ * @see afs_wq_add
+ */
+struct afs_work_queue_add_opts {
+    int donate; /**< 1 to donate the caller's reference of the work node to
+                 *   the queue, 0 to add a reference for the queue */
+    int block;  /**< 1 to block if the queue is 'full', 0 to return
+                 *   immediately with an error if the queue is full. Ignored
+                 *   when 'force' is set */
+    int force;  /**< 1 to force the node to be added to the queue, even if the
+                 *   queue is at or beyond the pend_hithresh quota, 0 to block
+                 *   or return an error according to the 'block' option */
+};
+
+/**
+ * work_queue callback function.
+ *
+ * @param[in] queue        pointer to struct afs_work_queue
+ * @param[in] node         pointer to struct afs_work_queue_node
+ * @param[in] queue_rock   opaque pointer associated with this work queue
+ * @param[in] node_rock    opaque pointer associated with this work element
+ * @param[in] caller_rock  opaque pointer passed in by caller of afs_wq_do()
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval AFS_WQ_ERROR callback suffered fatal error;
+ *                              propagate to caller immediately.
+ *    @retval AFS_WQ_ERROR_RECOVERABLE callback suffered a non-fatal error.
+ *    @retval AFS_WQ_ERROR_RESCHEDULE callback requested to be scheduled
+ *                                         again; for example it may have
+ *                                         changed its dependencies.
+ */
+typedef int afs_wq_callback_func_t(struct afs_work_queue * queue,
+                                  struct afs_work_queue_node * node,
+                                  void * queue_rock,
+                                  void * node_rock,
+                                  void * caller_rock);
+
+/**
+ * node rock destructor function.
+ *
+ * This function is called when the associated afs_work_queue_node is freed.
+ * It should free any resources associated with the rock.
+ *
+ * @param[in] node_rock  rock given to the node's callback function
+ */
+typedef void afs_wq_callback_dtor_t(void *node_rock);
+
+#endif /* AFS_UTIL_WORK_QUEUE_TYPES_H */