2 * Copyright 2008-2010, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
18 #define __AFS_WORK_QUEUE_IMPL 1
19 #include "work_queue.h"
20 #include "work_queue_impl.h"
23 * public interfaces for work_queue.
27 _afs_wq_node_put_r(struct afs_work_queue_node * node,
31 * allocate a work queue object.
33 * @param[out] queue_out address in which to store queue pointer
35 * @return operation status
37 * @retval ENOMEM out of memory
42 _afs_wq_alloc(struct afs_work_queue ** queue_out)
45 struct afs_work_queue * queue;
47 *queue_out = queue = malloc(sizeof(*queue));
58 * free a work queue object.
60 * @param[in] queue work queue object
62 * @return operation status
68 _afs_wq_free(struct afs_work_queue * queue)
78 * change a node's state.
80 * @param[in] node node object
81 * @param[in] new_state new object state
85 * @pre node->lock held
89 static afs_wq_work_state_t
90 _afs_wq_node_state_change(struct afs_work_queue_node * node,
91 afs_wq_work_state_t new_state)
93 afs_wq_work_state_t old_state;
95 old_state = node->state;
96 node->state = new_state;
98 CV_BROADCAST(&node->state_cv);
104 * wait for a node's state to change from busy to something else.
106 * @param[in] node node object
108 * @return operation status
111 * @pre node->lock held
116 _afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
118 while (node->state == AFS_WQ_NODE_STATE_BUSY) {
119 CV_WAIT(&node->state_cv, &node->lock);
126 * check whether a work queue node is busy.
128 * @param[in] node node object pointer
130 * @return whether node is busy
131 * @retval 1 node is busy
132 * @retval 0 node is not busy
134 * @pre node->lock held
139 _afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
141 return (node->state == AFS_WQ_NODE_STATE_BUSY);
145 * attempt to simultaneously lock two work queue nodes.
147 * this is a somewhat tricky algorithm because there is no
148 * defined hierarchy within the work queue node population.
150 * @param[in] ml multilock control structure
152 * @return operation status
155 * @note in theory, we could easily extend this to
156 * lock more than two nodes
159 * - caller MUST NOT have set busy state on either node
162 * - locks held on both nodes
163 * - both nodes in quiescent states
165 * @note node with non-zero lock_held or busy_held fields
166 * MUST go in array index 0
171 _afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
174 struct timespec delay;
175 int first = 1, second = 0, tmp;
177 /* first pass processing */
178 if (ml->nodes[0].lock_held) {
179 if (!ml->nodes[0].busy_held) {
180 ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
186 code = MUTEX_TRYENTER(&ml->nodes[1].node->lock);
192 /* setup for main loop */
193 MUTEX_EXIT(&ml->nodes[0].node->lock);
197 * setup random exponential backoff
199 * set initial delay to random value in the range [500,1000) ns
202 delay.tv_nsec = 500 + rand() % 500;
205 MUTEX_ENTER(&ml->nodes[first].node->lock);
206 if ((first != 0) || !ml->nodes[0].busy_held) {
207 ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
210 if (!ml->nodes[0].lock_held || first) {
211 MUTEX_EXIT(&ml->nodes[first].node->lock);
212 if (ml->nodes[0].lock_held) {
213 /* on error, return with locks in same state as before call */
214 MUTEX_ENTER(&ml->nodes[0].node->lock);
222 * in order to avoid deadlock, we must use trylock and
223 * a non-blocking state check. if we meet any contention,
224 * we must drop back and start again.
226 code = MUTEX_TRYENTER(&ml->nodes[second].node->lock);
228 if (((second == 0) && (ml->nodes[0].busy_held)) ||
229 !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
233 MUTEX_EXIT(&ml->nodes[second].node->lock);
240 * drop locks, use exponential backoff,
241 * try acquiring in the opposite order
243 MUTEX_EXIT(&ml->nodes[first].node->lock);
244 nanosleep(&delay, NULL);
245 if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
259 * initialize a node list object.
261 * @param[in] list list object
262 * @param[in] id list identifier
264 * @return operation status
270 _afs_wq_node_list_init(struct afs_work_queue_node_list * list,
271 afs_wq_node_list_id_t id)
273 queue_Init(&list->list);
274 MUTEX_INIT(&list->lock, "list", MUTEX_DEFAULT, 0);
275 CV_INIT(&list->cv, "list", CV_DEFAULT, 0);
283 * destroy a node list object.
285 * @param[in] list list object
287 * @return operation status
289 * @retval AFS_WQ_ERROR list not empty
294 _afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
298 if (queue_IsNotEmpty(&list->list)) {
303 MUTEX_DESTROY(&list->lock);
304 CV_DESTROY(&list->cv);
311 * wakeup all threads waiting in dequeue.
313 * @param[in] list list object
315 * @return operation status
321 _afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
324 struct afs_work_queue_node *node, *nnode;
326 MUTEX_ENTER(&list->lock);
329 for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
330 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
332 node->qidx = AFS_WQ_NODE_LIST_NONE;
335 if (node->detached) {
336 /* if we are detached, we hold the reference on the node;
337 * otherwise, it is some other caller that holds the reference.
338 * So don't put the node if we are not detached; the node will
339 * get freed when someone else calls afs_wq_node_put */
340 afs_wq_node_put(node);
344 CV_BROADCAST(&list->cv);
345 MUTEX_EXIT(&list->lock);
351 * append to a node list object.
353 * @param[in] list list object
354 * @param[in] node node object
355 * @param[in] state new node state
357 * @return operation status
359 * @retval AFS_WQ_ERROR raced to enqueue node
363 * - node is not on a list
364 * - node is either not busy, or it is marked as busy by the calling thread
368 * - node lock dropped
373 _afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
374 struct afs_work_queue_node * node,
375 afs_wq_work_state_t state)
379 if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
385 /* deal with lock inversion */
386 code = MUTEX_TRYENTER(&list->lock);
389 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
390 MUTEX_EXIT(&node->lock);
391 MUTEX_ENTER(&list->lock);
392 MUTEX_ENTER(&node->lock);
394 /* assert state of the world (we set busy, so this should never happen) */
395 osi_Assert(queue_IsNotOnQueue(node));
398 if (list->shutdown) {
403 osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
404 if (queue_IsEmpty(&list->list)) {
405 /* wakeup a dequeue thread */
406 CV_SIGNAL(&list->cv);
408 queue_Append(&list->list, node);
409 node->qidx = list->qidx;
410 _afs_wq_node_state_change(node, state);
413 MUTEX_EXIT(&node->lock);
414 MUTEX_EXIT(&list->lock);
421 * dequeue a node from a list object.
423 * @param[in] list list object
424 * @param[out] node_out address in which to store node object pointer
425 * @param[in] state new node state
426 * @param[in] block permit blocking on cv if asserted
428 * @return operation status
430 * @retval EWOULDBLOCK block not asserted and nothing to dequeue
431 * @retval EINTR blocking wait interrupted by list shutdown
433 * @post node object returned with node lock held and new state set
438 _afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
439 struct afs_work_queue_node ** node_out,
440 afs_wq_work_state_t state,
444 struct afs_work_queue_node * node;
446 MUTEX_ENTER(&list->lock);
448 if (list->shutdown) {
454 if (!block && queue_IsEmpty(&list->list)) {
460 while (queue_IsEmpty(&list->list)) {
461 if (list->shutdown) {
466 CV_WAIT(&list->cv, &list->lock);
469 *node_out = node = queue_First(&list->list, afs_work_queue_node);
471 MUTEX_ENTER(&node->lock);
473 node->qidx = AFS_WQ_NODE_LIST_NONE;
474 _afs_wq_node_state_change(node, state);
477 MUTEX_EXIT(&list->lock);
483 * remove a node from a list.
485 * @param[in] node node object
486 * @param[in] next_state node state following successful dequeue
488 * @return operation status
490 * @retval AFS_WQ_ERROR in any of the following conditions:
491 * - node not associated with a work queue
492 * - node was not on a linked list (e.g. RUNNING state)
493 * - we raced another thread
495 * @pre node->lock held
497 * @post node removed from node list
499 * @note node->lock may be dropped internally
504 _afs_wq_node_list_remove(struct afs_work_queue_node * node,
505 afs_wq_work_state_t next_state)
508 struct afs_work_queue_node_list * list = NULL;
510 _afs_wq_node_state_wait_busy(node);
516 switch (node->qidx) {
517 case AFS_WQ_NODE_LIST_READY:
518 list = &node->queue->ready_list;
521 case AFS_WQ_NODE_LIST_BLOCKED:
522 list = &node->queue->blocked_list;
525 case AFS_WQ_NODE_LIST_DONE:
526 list = &node->queue->done_list;
534 code = MUTEX_TRYENTER(&list->lock);
537 _afs_wq_node_state_change(node,
538 AFS_WQ_NODE_STATE_BUSY);
539 MUTEX_EXIT(&node->lock);
540 MUTEX_ENTER(&list->lock);
541 MUTEX_ENTER(&node->lock);
543 if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
551 node->qidx = AFS_WQ_NODE_LIST_NONE;
552 _afs_wq_node_state_change(node, next_state);
555 MUTEX_EXIT(&list->lock);
563 * allocate a dependency node.
565 * @param[out] node_out address in which to store dep node pointer
567 * @return operation status
569 * @retval ENOMEM out of memory
574 _afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
577 struct afs_work_queue_dep_node * node;
579 node = malloc(sizeof(*node));
585 queue_NodeInit(&node->parent_list);
586 node->parent = node->child = NULL;
595 * free a dependency node.
597 * @param[in] node dep node pointer
599 * @return operation status
601 * @retval AFS_WQ_ERROR still attached to a work node
606 _afs_wq_dep_free(struct afs_work_queue_dep_node * node)
610 if (queue_IsOnQueue(&node->parent_list) ||
624 * unlink work nodes from a dependency node.
626 * @param[in] dep dependency node
628 * @return operation status
632 * - dep->parent and dep->child are either locked, or are not referenced
634 * - caller holds ref on dep->child
635 * - dep->child and dep->parent in quiescent state
640 _afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
642 struct afs_work_queue_node *child = dep->child;
643 queue_Remove(&dep->parent_list);
647 return _afs_wq_node_put_r(child, 0);
651 * get a reference to a work node.
653 * @param[in] node work queue node
655 * @return operation status
658 * @pre node->lock held
663 _afs_wq_node_get_r(struct afs_work_queue_node * node)
671 * unlink and free all of the dependency nodes from a node.
673 * @param[in] parent work node that is the parent node of all deps to be freed
675 * @return operation status
678 * @pre parent->refcount == 0
681 _afs_wq_node_free_deps(struct afs_work_queue_node *parent)
684 struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
685 struct afs_work_queue_dep_node * dep, * nd;
687 /* unlink and free all of the dep structs attached to 'parent' */
688 for (queue_Scan(&parent->dep_children,
691 afs_work_queue_dep_node)) {
693 MUTEX_ENTER(&dep->child->lock);
694 node_unlock = dep->child;
696 /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
697 * put the last ref on the child, and we need the child to still exist
698 * so we can unlock it */
699 code = _afs_wq_node_get_r(dep->child);
703 node_put = dep->child;
705 /* remember, no need to lock dep->parent, since its refcount is 0 */
706 code = _afs_wq_dep_unlink_r(dep);
710 _afs_wq_node_put_r(node_put, 1);
711 } else if (node_unlock) {
712 MUTEX_EXIT(&node_unlock->lock);
714 node_put = node_unlock = NULL;
717 /* Only do this if everything is okay; if code is nonzero,
718 * something will still be pointing at dep, so don't free it.
719 * We will leak memory, but that's better than memory corruption;
720 * we've done all we can do to try and free the dep memory */
721 code = _afs_wq_dep_free(dep);
732 * propagate state down through dep nodes.
734 * @param[in] parent parent node object
735 * @param[in] next_state next state parent will assume
737 * @return operation status
741 * - parent->lock held
746 _afs_wq_dep_propagate(struct afs_work_queue_node * parent,
747 afs_wq_work_state_t next_state)
750 struct afs_work_queue_dep_node * dep, * nd;
751 struct afs_work_queue_node_multilock ml;
752 afs_wq_work_state_t old_state;
753 afs_wq_node_list_id_t qidx;
754 struct afs_work_queue_node_list * ql;
755 afs_wq_work_state_t cns;
757 old_state = _afs_wq_node_state_change(parent,
758 AFS_WQ_NODE_STATE_BUSY);
759 ml.nodes[0].node = parent;
760 ml.nodes[0].lock_held = 1;
761 ml.nodes[0].busy_held = 1;
763 /* scan through our children updating scheduling state */
764 for (queue_Scan(&parent->dep_children,
767 afs_work_queue_dep_node)) {
768 /* skip half-registered nodes */
769 if (dep->child == NULL) {
773 ml.nodes[1].node = dep->child;
774 ml.nodes[1].lock_held = 0;
775 ml.nodes[1].busy_held = 0;
776 ret = _afs_wq_node_multilock(&ml);
781 switch (next_state) {
782 case AFS_WQ_NODE_STATE_DONE:
783 dep->child->block_count--;
786 case AFS_WQ_NODE_STATE_ERROR:
787 dep->child->error_count++;
794 /* skip unscheduled nodes */
795 if (dep->child->queue == NULL) {
796 MUTEX_EXIT(&dep->child->lock);
801 * when blocked dep and error'd dep counts reach zero, the
802 * node can be scheduled for execution
804 if (dep->child->error_count) {
805 ql = &dep->child->queue->done_list;
806 qidx = AFS_WQ_NODE_LIST_DONE;
807 cns = AFS_WQ_NODE_STATE_ERROR;
808 } else if (dep->child->block_count) {
809 ql = &dep->child->queue->blocked_list;
810 qidx = AFS_WQ_NODE_LIST_BLOCKED;
811 cns = AFS_WQ_NODE_STATE_BLOCKED;
813 ql = &dep->child->queue->ready_list;
814 qidx = AFS_WQ_NODE_LIST_READY;
815 cns = AFS_WQ_NODE_STATE_SCHEDULED;
818 if (qidx != dep->child->qidx) {
819 /* we're transitioning to a different queue */
820 ret = _afs_wq_node_list_remove(dep->child,
821 AFS_WQ_NODE_STATE_BUSY);
823 MUTEX_EXIT(&dep->child->lock);
827 ret = _afs_wq_node_list_enqueue(ql,
831 MUTEX_EXIT(&dep->child->lock);
835 MUTEX_EXIT(&dep->child->lock);
839 _afs_wq_node_state_change(parent,
845 * decrements queue->running_count, and signals waiters if appropriate.
847 * @param[in] queue queue to dec the running count of
850 _afs_wq_dec_running_count(struct afs_work_queue *queue)
852 MUTEX_ENTER(&queue->lock);
853 queue->running_count--;
854 if (queue->shutdown && queue->running_count == 0) {
855 /* if we've shut down, someone may be waiting for the running count
857 CV_BROADCAST(&queue->running_cv);
859 MUTEX_EXIT(&queue->lock);
863 * execute a node on the queue.
865 * @param[in] queue work queue
866 * @param[in] rock opaque pointer (passed as third arg to callback func)
867 * @param[in] block allow blocking in dequeue
869 * @return operation status
870 * @retval 0 completed a work unit
875 _afs_wq_do(struct afs_work_queue * queue,
880 struct afs_work_queue_node * node;
881 afs_wq_callback_func_t * cbf;
882 afs_wq_work_state_t next_state;
883 struct afs_work_queue_node_list * ql;
887 /* We can inc queue->running_count before actually pulling the node off
888 * of the ready_list, since running_count only really matters when we are
889 * shut down. If we get shut down before we pull the node off of
890 * ready_list, but after we inc'd running_count,
891 * _afs_wq_node_list_dequeue should return immediately with EINTR,
892 * in which case we'll dec running_count, so it's as if we never inc'd it
893 * in the first place. */
894 MUTEX_ENTER(&queue->lock);
895 if (queue->shutdown) {
896 MUTEX_EXIT(&queue->lock);
899 queue->running_count++;
900 MUTEX_EXIT(&queue->lock);
902 ret = _afs_wq_node_list_dequeue(&queue->ready_list,
904 AFS_WQ_NODE_STATE_RUNNING,
907 _afs_wq_dec_running_count(queue);
912 node_rock = node->rock;
913 detached = node->detached;
916 MUTEX_EXIT(&node->lock);
917 code = (*cbf)(queue, node, queue->rock, node_rock, rock);
918 MUTEX_ENTER(&node->lock);
920 next_state = AFS_WQ_NODE_STATE_DONE;
921 ql = &queue->done_list;
922 } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
923 if (node->error_count) {
924 next_state = AFS_WQ_NODE_STATE_ERROR;
925 ql = &queue->done_list;
926 } else if (node->block_count) {
927 next_state = AFS_WQ_NODE_STATE_BLOCKED;
928 ql = &queue->blocked_list;
930 next_state = AFS_WQ_NODE_STATE_SCHEDULED;
931 ql = &queue->ready_list;
934 next_state = AFS_WQ_NODE_STATE_ERROR;
935 ql = &queue->done_list;
938 next_state = AFS_WQ_NODE_STATE_DONE;
940 ql = &queue->done_list;
943 _afs_wq_dec_running_count(queue);
945 node->retcode = code;
947 if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
948 (next_state == AFS_WQ_NODE_STATE_ERROR)) {
950 MUTEX_ENTER(&queue->lock);
952 if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
953 /* signal other threads if we're about to below the low
954 * pending-tasks threshold */
956 CV_SIGNAL(&queue->pend_cv);
959 if (queue->pend_count == 1) {
960 /* signal other threads if we're about to become 'empty' */
961 CV_BROADCAST(&queue->empty_cv);
966 MUTEX_EXIT(&queue->lock);
969 ret = _afs_wq_node_state_wait_busy(node);
974 /* propagate scheduling changes down through dependencies */
975 ret = _afs_wq_dep_propagate(node, next_state);
980 ret = _afs_wq_node_state_wait_busy(node);
986 ((next_state == AFS_WQ_NODE_STATE_DONE) ||
987 (next_state == AFS_WQ_NODE_STATE_ERROR))) {
988 _afs_wq_node_state_change(node, next_state);
989 _afs_wq_node_put_r(node, 1);
991 ret = _afs_wq_node_list_enqueue(ql,
1001 * initialize a struct afs_work_queue_opts to the default values
1003 * @param[out] opts opts struct to initialize
1006 afs_wq_opts_init(struct afs_work_queue_opts *opts)
1008 opts->pend_lothresh = 0;
1009 opts->pend_hithresh = 0;
1013 * set the options for a struct afs_work_queue_opts appropriate for a certain
1014 * number of threads.
1016 * @param[out] opts opts struct in which to set the values
1017 * @param[in] threads number of threads
1020 afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
1022 opts->pend_lothresh = threads * 2;
1023 opts->pend_hithresh = threads * 16;
1026 if (opts->pend_lothresh < 1) {
1027 opts->pend_lothresh = 1;
1029 if (opts->pend_hithresh < 2) {
1030 opts->pend_hithresh = 2;
1035 * allocate and initialize a work queue object.
1037 * @param[out] queue_out address in which to store newly allocated work queue object
1038 * @param[in] rock work queue opaque pointer (passed as first arg to all fired callbacks)
1039 * @param[in] opts options for the new created queue
1041 * @return operation status
1043 * @retval ENOMEM out of memory
1046 afs_wq_create(struct afs_work_queue ** queue_out,
1048 struct afs_work_queue_opts *opts)
1051 struct afs_work_queue * queue;
1053 ret = _afs_wq_alloc(queue_out);
1060 memcpy(&queue->opts, opts, sizeof(queue->opts));
1062 afs_wq_opts_init(&queue->opts);
1065 _afs_wq_node_list_init(&queue->ready_list,
1066 AFS_WQ_NODE_LIST_READY);
1067 _afs_wq_node_list_init(&queue->blocked_list,
1068 AFS_WQ_NODE_LIST_BLOCKED);
1069 _afs_wq_node_list_init(&queue->done_list,
1070 AFS_WQ_NODE_LIST_DONE);
1073 queue->shutdown = 0;
1074 queue->pend_count = 0;
1075 queue->running_count = 0;
1077 MUTEX_INIT(&queue->lock, "queue", MUTEX_DEFAULT, 0);
1078 CV_INIT(&queue->pend_cv, "queue pending", CV_DEFAULT, 0);
1079 CV_INIT(&queue->empty_cv, "queue empty", CV_DEFAULT, 0);
1080 CV_INIT(&queue->running_cv, "queue running", CV_DEFAULT, 0);
1087 * deallocate and free a work queue object.
1089 * @param[in] queue work queue to be destroyed
1091 * @return operation status
1093 * @retval AFS_WQ_ERROR unspecified error
1096 afs_wq_destroy(struct afs_work_queue * queue)
1100 ret = _afs_wq_node_list_destroy(&queue->ready_list);
1105 ret = _afs_wq_node_list_destroy(&queue->blocked_list);
1110 ret = _afs_wq_node_list_destroy(&queue->done_list);
1115 ret = _afs_wq_free(queue);
1122 * shutdown a work queue.
1124 * @param[in] queue work queue object pointer
1126 * @return operation status
1130 afs_wq_shutdown(struct afs_work_queue * queue)
1134 MUTEX_ENTER(&queue->lock);
1135 if (queue->shutdown) {
1136 /* already shutdown, do nothing */
1137 MUTEX_EXIT(&queue->lock);
1140 queue->shutdown = 1;
1142 ret = _afs_wq_node_list_shutdown(&queue->ready_list);
1147 ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
1152 ret = _afs_wq_node_list_shutdown(&queue->done_list);
1157 /* signal everyone that could be waiting, since these conditions will
1158 * generally fail to signal on their own if we're shutdown, since no
1159 * progress is being made */
1160 CV_BROADCAST(&queue->pend_cv);
1161 CV_BROADCAST(&queue->empty_cv);
1162 MUTEX_EXIT(&queue->lock);
1169 * allocate a work node.
1171 * @param[out] node_out address in which to store new work node
1173 * @return operation status
1175 * @retval ENOMEM out of memory
1178 afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
1181 struct afs_work_queue_node * node;
1183 *node_out = node = (struct afs_work_queue_node *) malloc(sizeof(*node));
1189 queue_NodeInit(&node->node_list);
1190 node->qidx = AFS_WQ_NODE_LIST_NONE;
1192 node->rock = node->queue = NULL;
1194 node->block_count = 0;
1195 node->error_count = 0;
1196 MUTEX_INIT(&node->lock, "node", MUTEX_DEFAULT, 0);
1197 CV_INIT(&node->state_cv, "node state", CV_DEFAULT, 0);
1198 node->state = AFS_WQ_NODE_STATE_INIT;
1199 queue_Init(&node->dep_children);
1208 * @param[in] node work node object
1210 * @return operation status
1216 _afs_wq_node_free(struct afs_work_queue_node * node)
1220 if (queue_IsOnQueue(node) ||
1221 (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
1222 (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
1223 (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
1228 ret = _afs_wq_node_free_deps(node);
1233 MUTEX_DESTROY(&node->lock);
1234 CV_DESTROY(&node->state_cv);
1236 if (node->rock_dtor) {
1237 (*node->rock_dtor) (node->rock);
1247 * get a reference to a work node.
1249 * @param[in] node work queue node
1251 * @return operation status
1255 afs_wq_node_get(struct afs_work_queue_node * node)
1257 MUTEX_ENTER(&node->lock);
1259 MUTEX_EXIT(&node->lock);
1265 * put back a reference to a work node.
1267 * @param[in] node work queue node
1268 * @param[in] drop drop node->lock
1270 * @post if refcount reaches zero, node is deallocated.
1272 * @return operation status
1275 * @pre node->lock held
1280 _afs_wq_node_put_r(struct afs_work_queue_node * node,
1285 osi_Assert(node->refcount > 0);
1286 refc = --node->refcount;
1288 MUTEX_EXIT(&node->lock);
1291 osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
1292 _afs_wq_node_free(node);
1299 * put back a reference to a work node.
1301 * @param[in] node work queue node
1303 * @post if refcount reaches zero, node is deallocated.
1305 * @return operation status
1309 afs_wq_node_put(struct afs_work_queue_node * node)
1311 MUTEX_ENTER(&node->lock);
1312 return _afs_wq_node_put_r(node, 1);
1316 * set the callback function on a work node.
1318 * @param[in] node work queue node
1319 * @param[in] cbf callback function
1320 * @param[in] rock opaque pointer passed to callback
1321 * @param[in] rock_dtor destructor function for 'rock', or NULL
1323 * @return operation status
1327 afs_wq_node_set_callback(struct afs_work_queue_node * node,
1328 afs_wq_callback_func_t * cbf,
1329 void * rock, afs_wq_callback_dtor_t *dtor)
1331 MUTEX_ENTER(&node->lock);
1334 node->rock_dtor = dtor;
1335 MUTEX_EXIT(&node->lock);
1343 * @param[in] node work queue node
1345 * @return operation status
1349 afs_wq_node_set_detached(struct afs_work_queue_node * node)
1351 MUTEX_ENTER(&node->lock);
1353 MUTEX_EXIT(&node->lock);
1359 * link a dependency node to a parent and child work node.
1361 * This links a dependency node such that when the 'parent' work node is
1362 * done, the 'child' work node can proceed.
1364 * @param[in] dep dependency node
1365 * @param[in] parent parent node in this dependency
1366 * @param[in] child child node in this dependency
1368 * @return operation status
1372 * - parent->lock held
1373 * - child->lock held
1374 * - parent and child in quiescent state
1379 _afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
1380 struct afs_work_queue_node *parent,
1381 struct afs_work_queue_node *child)
1385 /* Each dep node adds a ref to the child node of that dep. We do not
1386 * do the same for the parent node, since if the only refs remaining
1387 * for a node are deps in node->dep_children, then the node should be
1388 * destroyed, and we will destroy the dep nodes when we free the
1390 ret = _afs_wq_node_get_r(child);
1395 /* add this dep node to the parent node's list of deps */
1396 queue_Append(&parent->dep_children, &dep->parent_list);
1399 dep->parent = parent;
1406 * add a dependency to a work node.
1408 * @param[in] child node which will be dependent upon completion of parent
1409 * @param[in] parent node whose completion gates child's execution
1412 * - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1414 * @return operation status
1418 afs_wq_node_dep_add(struct afs_work_queue_node * child,
1419 struct afs_work_queue_node * parent)
1422 struct afs_work_queue_dep_node * dep = NULL;
1423 struct afs_work_queue_node_multilock ml;
1426 /* self references are bad, mmkay? */
1427 if (parent == child) {
1432 ret = _afs_wq_dep_alloc(&dep);
1437 memset(&ml, 0, sizeof(ml));
1438 ml.nodes[0].node = parent;
1439 ml.nodes[1].node = child;
1440 ret = _afs_wq_node_multilock(&ml);
1446 /* only allow dep modification while in initial state
1447 * or running state (e.g. do a dep add while inside callback) */
1448 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1449 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1454 /* link dep node with child and parent work queue node */
1455 ret = _afs_wq_dep_link_r(dep, parent, child);
1460 /* handle blocking counts */
1461 switch (parent->state) {
1462 case AFS_WQ_NODE_STATE_INIT:
1463 case AFS_WQ_NODE_STATE_SCHEDULED:
1464 case AFS_WQ_NODE_STATE_RUNNING:
1465 case AFS_WQ_NODE_STATE_BLOCKED:
1466 child->block_count++;
1469 case AFS_WQ_NODE_STATE_ERROR:
1470 child->error_count++;
1479 MUTEX_EXIT(&child->lock);
1480 MUTEX_EXIT(&parent->lock);
1486 _afs_wq_dep_free(dep);
1492 * remove a dependency from a work node.
1494 * @param[in] child node which was dependent upon completion of parent
1495 * @param[in] parent node whose completion gated child's execution
1497 * @return operation status
1501 afs_wq_node_dep_del(struct afs_work_queue_node * child,
1502 struct afs_work_queue_node * parent)
1505 struct afs_work_queue_dep_node * dep, * ndep;
1506 struct afs_work_queue_node_multilock ml;
1509 memset(&ml, 0, sizeof(ml));
1510 ml.nodes[0].node = parent;
1511 ml.nodes[1].node = child;
1512 code = _afs_wq_node_multilock(&ml);
1518 /* only permit changes while child is in init state
1519 * or running state (e.g. do a dep del when in callback func) */
1520 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1521 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1526 /* locate node linking parent and child */
1527 for (queue_Scan(&parent->dep_children,
1530 afs_work_queue_dep_node)) {
1531 if ((dep->child == child) &&
1532 (dep->parent == parent)) {
1534 /* no need to grab an extra ref on dep->child here; the caller
1535 * should already have a ref on dep->child */
1536 code = _afs_wq_dep_unlink_r(dep);
1542 code = _afs_wq_dep_free(dep);
1553 MUTEX_EXIT(&child->lock);
1554 MUTEX_EXIT(&parent->lock);
1560 * block a work node from execution.
1562 * this can be used to allow external events to influence work queue flow.
1564 * @param[in] node work queue node to be blocked
1566 * @return operation status
1569 * @post external block count incremented
1572 afs_wq_node_block(struct afs_work_queue_node * node)
1577 MUTEX_ENTER(&node->lock);
1578 ret = _afs_wq_node_state_wait_busy(node);
1583 start = node->block_count++;
1586 (node->qidx == AFS_WQ_NODE_LIST_READY)) {
1587 /* unblocked->blocked transition, and we're already scheduled */
1588 ret = _afs_wq_node_list_remove(node,
1589 AFS_WQ_NODE_STATE_BUSY);
1594 ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
1596 AFS_WQ_NODE_STATE_BLOCKED);
1600 MUTEX_EXIT(&node->lock);
1606 * unblock a work node for execution.
1608 * this can be used to allow external events to influence work queue flow.
1610 * @param[in] node work queue node to be blocked
1612 * @return operation status
1615 * @post external block count decremented
1618 afs_wq_node_unblock(struct afs_work_queue_node * node)
1623 MUTEX_ENTER(&node->lock);
1624 ret = _afs_wq_node_state_wait_busy(node);
1629 end = --node->block_count;
1632 (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
1633 /* blocked->unblock transition, and we're ready to be scheduled */
1634 ret = _afs_wq_node_list_remove(node,
1635 AFS_WQ_NODE_STATE_BUSY);
1640 ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
1642 AFS_WQ_NODE_STATE_SCHEDULED);
1646 MUTEX_EXIT(&node->lock);
1652 * initialize a afs_wq_add_opts struct with the default options.
1654 * @param[out] opts options structure to initialize
1657 afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
1665 * schedule a work node for execution.
1667 * @param[in] queue work queue
1668 * @param[in] node work node
1669 * @param[in] opts options for adding, or NULL for defaults
1671 * @return operation status
1673 * @retval EWOULDBLOCK queue is full and opts specified not to block
1674 * @retval EINTR queue was full, we blocked to add, and the queue was
1675 * shutdown while we were blocking
1678 afs_wq_add(struct afs_work_queue *queue,
1679 struct afs_work_queue_node *node,
1680 struct afs_work_queue_add_opts *opts)
1683 int donate, block, force, hithresh;
1684 struct afs_work_queue_node_list * list;
1685 struct afs_work_queue_add_opts l_opts;
1686 int waited_for_drain = 0;
1687 afs_wq_work_state_t state;
1690 afs_wq_add_opts_init(&l_opts);
1694 donate = opts->donate;
1695 block = opts->block;
1696 force = opts->force;
1699 MUTEX_ENTER(&node->lock);
1701 ret = _afs_wq_node_state_wait_busy(node);
1706 if (!node->block_count && !node->error_count) {
1707 list = &queue->ready_list;
1708 state = AFS_WQ_NODE_STATE_SCHEDULED;
1709 } else if (node->error_count) {
1710 list = &queue->done_list;
1711 state = AFS_WQ_NODE_STATE_ERROR;
1713 list = &queue->blocked_list;
1714 state = AFS_WQ_NODE_STATE_BLOCKED;
1719 MUTEX_ENTER(&queue->lock);
1721 if (queue->shutdown) {
1723 MUTEX_EXIT(&queue->lock);
1724 MUTEX_EXIT(&node->lock);
1728 hithresh = queue->opts.pend_hithresh;
1729 if (hithresh > 0 && queue->pend_count >= hithresh) {
1733 if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
1734 || state == AFS_WQ_NODE_STATE_BLOCKED)) {
1738 MUTEX_EXIT(&node->lock);
1739 CV_WAIT(&queue->pend_cv, &queue->lock);
1741 if (queue->shutdown) {
1744 MUTEX_EXIT(&queue->lock);
1746 waited_for_drain = 1;
1757 queue->pend_count++;
1759 if (waited_for_drain) {
1760 /* signal another thread that may have been waiting for drain */
1761 CV_SIGNAL(&queue->pend_cv);
1764 MUTEX_EXIT(&queue->lock);
1772 node->queue = queue;
1774 ret = _afs_wq_node_list_enqueue(list,
1782 * de-schedule a work node.
1784 * @param[in] node work node
1786 * @return operation status
1790 afs_wq_del(struct afs_work_queue_node * node)
1797 * execute a node on the queue.
1799 * @param[in] queue work queue
1800 * @param[in] rock opaque pointer (passed as third arg to callback func)
1802 * @return operation status
1803 * @retval 0 completed a work unit
1806 afs_wq_do(struct afs_work_queue * queue,
1809 return _afs_wq_do(queue, rock, 1);
1813 * execute a node on the queue, if there is any work to do.
1815 * @param[in] queue work queue
1816 * @param[in] rock opaque pointer (passed as third arg to callback func)
1818 * @return operation status
1819 * @retval 0 completed a work unit
1820 * @retval EWOULDBLOCK there was nothing to do
1823 afs_wq_do_nowait(struct afs_work_queue * queue,
1826 return _afs_wq_do(queue, rock, 0);
1830 * wait for all pending nodes to finish.
1832 * @param[in] queue work queue
1834 * @return operation status
1837 * @post the specified queue was empty at some point; it may not be empty by
1838 * the time this function returns, but at some point after the function was
1839 * called, there were no nodes in the ready queue or blocked queue.
1842 afs_wq_wait_all(struct afs_work_queue *queue)
1846 MUTEX_ENTER(&queue->lock);
1848 while (queue->pend_count > 0 && !queue->shutdown) {
1849 CV_WAIT(&queue->empty_cv, &queue->lock);
1852 if (queue->shutdown) {
1853 /* queue has been shut down, but there may still be some threads
1854 * running e.g. in the middle of their callback. ensure they have
1855 * stopped before we return. */
1856 while (queue->running_count > 0) {
1857 CV_WAIT(&queue->running_cv, &queue->lock);
1864 MUTEX_EXIT(&queue->lock);
1866 /* technically this doesn't really guarantee that the work queue is empty
1867 * after we return, but we do guarantee that it was empty at some point */
1873 * wait for a node to complete; dequeue from done list.
1875 * @param[in] node work queue node
1876 * @param[out] retcode return code from work unit
1878 * @return operation status
1881 * @pre ref held on node
1884 afs_wq_node_wait(struct afs_work_queue_node * node,
1889 MUTEX_ENTER(&node->lock);
1890 if (node->state == AFS_WQ_NODE_STATE_INIT) {
1891 /* not sure what to do in this case */
1895 while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
1896 (node->state != AFS_WQ_NODE_STATE_ERROR)) {
1897 CV_WAIT(&node->state_cv, &node->lock);
1900 *retcode = node->retcode;
1903 if (node->queue == NULL) {
1904 /* nothing we can do */
1908 ret = _afs_wq_node_list_remove(node,
1909 AFS_WQ_NODE_STATE_INIT);
1912 MUTEX_EXIT(&node->lock);