2 * Copyright 2008-2010, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <afs/afs_assert.h>
19 #define __AFS_WORK_QUEUE_IMPL 1
20 #include "work_queue.h"
21 #include "work_queue_impl.h"
24 * public interfaces for work_queue.
28 _afs_wq_node_put_r(struct afs_work_queue_node * node,
32 * allocate a work queue object.
34 * @param[out] queue_out address in which to store queue pointer
36 * @return operation status
38 * @retval ENOMEM out of memory
43 _afs_wq_alloc(struct afs_work_queue ** queue_out)
46 struct afs_work_queue * queue;
48 *queue_out = queue = malloc(sizeof(*queue));
59 * free a work queue object.
61 * @param[in] queue work queue object
63 * @return operation status
69 _afs_wq_free(struct afs_work_queue * queue)
79 * change a node's state.
81 * @param[in] node node object
82 * @param[in] new_state new object state
86 * @pre node->lock held
90 static afs_wq_work_state_t
91 _afs_wq_node_state_change(struct afs_work_queue_node * node,
92 afs_wq_work_state_t new_state)
94 afs_wq_work_state_t old_state;
96 old_state = node->state;
97 node->state = new_state;
99 CV_BROADCAST(&node->state_cv);
105 * wait for a node's state to change from busy to something else.
107 * @param[in] node node object
109 * @return operation status
112 * @pre node->lock held
117 _afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
119 while (node->state == AFS_WQ_NODE_STATE_BUSY) {
120 CV_WAIT(&node->state_cv, &node->lock);
127 * check whether a work queue node is busy.
129 * @param[in] node node object pointer
131 * @return whether node is busy
132 * @retval 1 node is busy
133 * @retval 0 node is not busy
135 * @pre node->lock held
140 _afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
142 return (node->state == AFS_WQ_NODE_STATE_BUSY);
146 * attempt to simultaneously lock two work queue nodes.
148 * this is a somewhat tricky algorithm because there is no
149 * defined hierarchy within the work queue node population.
151 * @param[in] ml multilock control structure
153 * @return operation status
156 * @note in theory, we could easily extend this to
157 * lock more than two nodes
160 * - caller MUST NOT have set busy state on either node
163 * - locks held on both nodes
164 * - both nodes in quiescent states
166 * @note node with non-zero lock_held or busy_held fields
167 * MUST go in array index 0
172 _afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
175 struct timespec delay;
176 int first = 1, second = 0, tmp;
178 /* first pass processing */
179 if (ml->nodes[0].lock_held) {
180 if (!ml->nodes[0].busy_held) {
181 ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
187 code = MUTEX_TRYENTER(&ml->nodes[1].node->lock);
193 /* setup for main loop */
194 MUTEX_EXIT(&ml->nodes[0].node->lock);
198 * setup random exponential backoff
200 * set initial delay to random value in the range [500,1000) ns
203 delay.tv_nsec = 500 + rand() % 500;
206 MUTEX_ENTER(&ml->nodes[first].node->lock);
207 if ((first != 0) || !ml->nodes[0].busy_held) {
208 ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
211 if (!ml->nodes[0].lock_held || first) {
212 MUTEX_EXIT(&ml->nodes[first].node->lock);
213 if (ml->nodes[0].lock_held) {
214 /* on error, return with locks in same state as before call */
215 MUTEX_ENTER(&ml->nodes[0].node->lock);
223 * in order to avoid deadlock, we must use trylock and
224 * a non-blocking state check. if we meet any contention,
225 * we must drop back and start again.
227 code = MUTEX_TRYENTER(&ml->nodes[second].node->lock);
229 if (((second == 0) && (ml->nodes[0].busy_held)) ||
230 !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
234 MUTEX_EXIT(&ml->nodes[second].node->lock);
241 * drop locks, use exponential backoff,
242 * try acquiring in the opposite order
244 MUTEX_EXIT(&ml->nodes[first].node->lock);
245 nanosleep(&delay, NULL);
246 if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
260 * initialize a node list object.
262 * @param[in] list list object
263 * @param[in] id list identifier
265 * @return operation status
271 _afs_wq_node_list_init(struct afs_work_queue_node_list * list,
272 afs_wq_node_list_id_t id)
274 queue_Init(&list->list);
275 MUTEX_INIT(&list->lock, "list", MUTEX_DEFAULT, 0);
276 CV_INIT(&list->cv, "list", CV_DEFAULT, 0);
284 * destroy a node list object.
286 * @param[in] list list object
288 * @return operation status
290 * @retval AFS_WQ_ERROR list not empty
295 _afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
299 if (queue_IsNotEmpty(&list->list)) {
304 MUTEX_DESTROY(&list->lock);
305 CV_DESTROY(&list->cv);
312 * wakeup all threads waiting in dequeue.
314 * @param[in] list list object
316 * @return operation status
322 _afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
325 struct afs_work_queue_node *node, *nnode;
327 MUTEX_ENTER(&list->lock);
330 for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
331 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
333 node->qidx = AFS_WQ_NODE_LIST_NONE;
336 if (node->detached) {
337 /* if we are detached, we hold the reference on the node;
338 * otherwise, it is some other caller that holds the reference.
339 * So don't put the node if we are not detached; the node will
340 * get freed when someone else calls afs_wq_node_put */
341 afs_wq_node_put(node);
345 CV_BROADCAST(&list->cv);
346 MUTEX_EXIT(&list->lock);
352 * append to a node list object.
354 * @param[in] list list object
355 * @param[in] node node object
356 * @param[in] state new node state
358 * @return operation status
360 * @retval AFS_WQ_ERROR raced to enqueue node
364 * - node is not on a list
365 * - node is either not busy, or it is marked as busy by the calling thread
369 * - node lock dropped
374 _afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
375 struct afs_work_queue_node * node,
376 afs_wq_work_state_t state)
380 if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
386 /* deal with lock inversion */
387 code = MUTEX_TRYENTER(&list->lock);
390 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
391 MUTEX_EXIT(&node->lock);
392 MUTEX_ENTER(&list->lock);
393 MUTEX_ENTER(&node->lock);
395 /* assert state of the world (we set busy, so this should never happen) */
396 osi_Assert(queue_IsNotOnQueue(node));
399 if (list->shutdown) {
404 osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
405 if (queue_IsEmpty(&list->list)) {
406 /* wakeup a dequeue thread */
407 CV_SIGNAL(&list->cv);
409 queue_Append(&list->list, node);
410 node->qidx = list->qidx;
411 _afs_wq_node_state_change(node, state);
414 MUTEX_EXIT(&node->lock);
415 MUTEX_EXIT(&list->lock);
422 * dequeue a node from a list object.
424 * @param[in] list list object
425 * @param[out] node_out address in which to store node object pointer
426 * @param[in] state new node state
427 * @param[in] block permit blocking on cv if asserted
429 * @return operation status
431 * @retval EWOULDBLOCK block not asserted and nothing to dequeue
432 * @retval EINTR blocking wait interrupted by list shutdown
434 * @post node object returned with node lock held and new state set
439 _afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
440 struct afs_work_queue_node ** node_out,
441 afs_wq_work_state_t state,
445 struct afs_work_queue_node * node;
447 MUTEX_ENTER(&list->lock);
449 if (list->shutdown) {
455 if (!block && queue_IsEmpty(&list->list)) {
461 while (queue_IsEmpty(&list->list)) {
462 if (list->shutdown) {
467 CV_WAIT(&list->cv, &list->lock);
470 *node_out = node = queue_First(&list->list, afs_work_queue_node);
472 MUTEX_ENTER(&node->lock);
474 node->qidx = AFS_WQ_NODE_LIST_NONE;
475 _afs_wq_node_state_change(node, state);
478 MUTEX_EXIT(&list->lock);
484 * remove a node from a list.
486 * @param[in] node node object
487 * @param[in] next_state node state following successful dequeue
489 * @return operation status
491 * @retval AFS_WQ_ERROR in any of the following conditions:
492 * - node not associated with a work queue
493 * - node was not on a linked list (e.g. RUNNING state)
494 * - we raced another thread
496 * @pre node->lock held
498 * @post node removed from node list
500 * @note node->lock may be dropped internally
505 _afs_wq_node_list_remove(struct afs_work_queue_node * node,
506 afs_wq_work_state_t next_state)
509 struct afs_work_queue_node_list * list = NULL;
511 _afs_wq_node_state_wait_busy(node);
517 switch (node->qidx) {
518 case AFS_WQ_NODE_LIST_READY:
519 list = &node->queue->ready_list;
522 case AFS_WQ_NODE_LIST_BLOCKED:
523 list = &node->queue->blocked_list;
526 case AFS_WQ_NODE_LIST_DONE:
527 list = &node->queue->done_list;
535 code = MUTEX_TRYENTER(&list->lock);
538 _afs_wq_node_state_change(node,
539 AFS_WQ_NODE_STATE_BUSY);
540 MUTEX_EXIT(&node->lock);
541 MUTEX_ENTER(&list->lock);
542 MUTEX_ENTER(&node->lock);
544 if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
552 node->qidx = AFS_WQ_NODE_LIST_NONE;
553 _afs_wq_node_state_change(node, next_state);
556 MUTEX_EXIT(&list->lock);
564 * allocate a dependency node.
566 * @param[out] node_out address in which to store dep node pointer
568 * @return operation status
570 * @retval ENOMEM out of memory
575 _afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
578 struct afs_work_queue_dep_node * node;
580 node = malloc(sizeof(*node));
586 queue_NodeInit(&node->parent_list);
587 node->parent = node->child = NULL;
596 * free a dependency node.
598 * @param[in] node dep node pointer
600 * @return operation status
602 * @retval AFS_WQ_ERROR still attached to a work node
607 _afs_wq_dep_free(struct afs_work_queue_dep_node * node)
611 if (queue_IsOnQueue(&node->parent_list) ||
625 * unlink work nodes from a dependency node.
627 * @param[in] dep dependency node
629 * @return operation status
633 * - dep->parent and dep->child are either locked, or are not referenced
635 * - caller holds ref on dep->child
636 * - dep->child and dep->parent in quiescent state
641 _afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
643 struct afs_work_queue_node *child = dep->child;
644 queue_Remove(&dep->parent_list);
648 return _afs_wq_node_put_r(child, 0);
652 * get a reference to a work node.
654 * @param[in] node work queue node
656 * @return operation status
659 * @pre node->lock held
664 _afs_wq_node_get_r(struct afs_work_queue_node * node)
672 * unlink and free all of the dependency nodes from a node.
674 * @param[in] parent work node that is the parent node of all deps to be freed
676 * @return operation status
679 * @pre parent->refcount == 0
682 _afs_wq_node_free_deps(struct afs_work_queue_node *parent)
685 struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
686 struct afs_work_queue_dep_node * dep, * nd;
688 /* unlink and free all of the dep structs attached to 'parent' */
689 for (queue_Scan(&parent->dep_children,
692 afs_work_queue_dep_node)) {
694 MUTEX_ENTER(&dep->child->lock);
695 node_unlock = dep->child;
697 /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
698 * put the last ref on the child, and we need the child to still exist
699 * so we can unlock it */
700 code = _afs_wq_node_get_r(dep->child);
704 node_put = dep->child;
706 /* remember, no need to lock dep->parent, since its refcount is 0 */
707 code = _afs_wq_dep_unlink_r(dep);
711 _afs_wq_node_put_r(node_put, 1);
712 } else if (node_unlock) {
713 MUTEX_EXIT(&node_unlock->lock);
715 node_put = node_unlock = NULL;
718 /* Only do this if everything is okay; if code is nonzero,
719 * something will still be pointing at dep, so don't free it.
720 * We will leak memory, but that's better than memory corruption;
721 * we've done all we can do to try and free the dep memory */
722 code = _afs_wq_dep_free(dep);
733 * propagate state down through dep nodes.
735 * @param[in] parent parent node object
736 * @param[in] next_state next state parent will assume
738 * @return operation status
742 * - parent->lock held
747 _afs_wq_dep_propagate(struct afs_work_queue_node * parent,
748 afs_wq_work_state_t next_state)
751 struct afs_work_queue_dep_node * dep, * nd;
752 struct afs_work_queue_node_multilock ml;
753 afs_wq_work_state_t old_state;
754 afs_wq_node_list_id_t qidx;
755 struct afs_work_queue_node_list * ql;
756 afs_wq_work_state_t cns;
758 old_state = _afs_wq_node_state_change(parent,
759 AFS_WQ_NODE_STATE_BUSY);
760 ml.nodes[0].node = parent;
761 ml.nodes[0].lock_held = 1;
762 ml.nodes[0].busy_held = 1;
764 /* scan through our children updating scheduling state */
765 for (queue_Scan(&parent->dep_children,
768 afs_work_queue_dep_node)) {
769 /* skip half-registered nodes */
770 if (dep->child == NULL) {
774 ml.nodes[1].node = dep->child;
775 ml.nodes[1].lock_held = 0;
776 ml.nodes[1].busy_held = 0;
777 ret = _afs_wq_node_multilock(&ml);
782 switch (next_state) {
783 case AFS_WQ_NODE_STATE_DONE:
784 dep->child->block_count--;
787 case AFS_WQ_NODE_STATE_ERROR:
788 dep->child->error_count++;
795 /* skip unscheduled nodes */
796 if (dep->child->queue == NULL) {
797 MUTEX_EXIT(&dep->child->lock);
802 * when blocked dep and error'd dep counts reach zero, the
803 * node can be scheduled for execution
805 if (dep->child->error_count) {
806 ql = &dep->child->queue->done_list;
807 qidx = AFS_WQ_NODE_LIST_DONE;
808 cns = AFS_WQ_NODE_STATE_ERROR;
809 } else if (dep->child->block_count) {
810 ql = &dep->child->queue->blocked_list;
811 qidx = AFS_WQ_NODE_LIST_BLOCKED;
812 cns = AFS_WQ_NODE_STATE_BLOCKED;
814 ql = &dep->child->queue->ready_list;
815 qidx = AFS_WQ_NODE_LIST_READY;
816 cns = AFS_WQ_NODE_STATE_SCHEDULED;
819 if (qidx != dep->child->qidx) {
820 /* we're transitioning to a different queue */
821 ret = _afs_wq_node_list_remove(dep->child,
822 AFS_WQ_NODE_STATE_BUSY);
824 MUTEX_EXIT(&dep->child->lock);
828 ret = _afs_wq_node_list_enqueue(ql,
832 MUTEX_EXIT(&dep->child->lock);
836 MUTEX_EXIT(&dep->child->lock);
840 _afs_wq_node_state_change(parent,
846 * decrements queue->running_count, and signals waiters if appropriate.
848 * @param[in] queue queue to dec the running count of
851 _afs_wq_dec_running_count(struct afs_work_queue *queue)
853 MUTEX_ENTER(&queue->lock);
854 queue->running_count--;
855 if (queue->shutdown && queue->running_count == 0) {
856 /* if we've shut down, someone may be waiting for the running count
858 CV_BROADCAST(&queue->running_cv);
860 MUTEX_EXIT(&queue->lock);
864 * execute a node on the queue.
866 * @param[in] queue work queue
867 * @param[in] rock opaque pointer (passed as third arg to callback func)
868 * @param[in] block allow blocking in dequeue
870 * @return operation status
871 * @retval 0 completed a work unit
876 _afs_wq_do(struct afs_work_queue * queue,
881 struct afs_work_queue_node * node;
882 afs_wq_callback_func_t * cbf;
883 afs_wq_work_state_t next_state;
884 struct afs_work_queue_node_list * ql;
888 /* We can inc queue->running_count before actually pulling the node off
889 * of the ready_list, since running_count only really matters when we are
890 * shut down. If we get shut down before we pull the node off of
891 * ready_list, but after we inc'd running_count,
892 * _afs_wq_node_list_dequeue should return immediately with EINTR,
893 * in which case we'll dec running_count, so it's as if we never inc'd it
894 * in the first place. */
895 MUTEX_ENTER(&queue->lock);
896 if (queue->shutdown) {
897 MUTEX_EXIT(&queue->lock);
900 queue->running_count++;
901 MUTEX_EXIT(&queue->lock);
903 ret = _afs_wq_node_list_dequeue(&queue->ready_list,
905 AFS_WQ_NODE_STATE_RUNNING,
908 _afs_wq_dec_running_count(queue);
913 node_rock = node->rock;
914 detached = node->detached;
917 MUTEX_EXIT(&node->lock);
918 code = (*cbf)(queue, node, queue->rock, node_rock, rock);
919 MUTEX_ENTER(&node->lock);
921 next_state = AFS_WQ_NODE_STATE_DONE;
922 ql = &queue->done_list;
923 } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
924 if (node->error_count) {
925 next_state = AFS_WQ_NODE_STATE_ERROR;
926 ql = &queue->done_list;
927 } else if (node->block_count) {
928 next_state = AFS_WQ_NODE_STATE_BLOCKED;
929 ql = &queue->blocked_list;
931 next_state = AFS_WQ_NODE_STATE_SCHEDULED;
932 ql = &queue->ready_list;
935 next_state = AFS_WQ_NODE_STATE_ERROR;
936 ql = &queue->done_list;
939 next_state = AFS_WQ_NODE_STATE_DONE;
941 ql = &queue->done_list;
944 _afs_wq_dec_running_count(queue);
946 node->retcode = code;
948 if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
949 (next_state == AFS_WQ_NODE_STATE_ERROR)) {
951 MUTEX_ENTER(&queue->lock);
953 if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
954 /* signal other threads if we're about to below the low
955 * pending-tasks threshold */
957 CV_SIGNAL(&queue->pend_cv);
960 if (queue->pend_count == 1) {
961 /* signal other threads if we're about to become 'empty' */
962 CV_BROADCAST(&queue->empty_cv);
967 MUTEX_EXIT(&queue->lock);
970 ret = _afs_wq_node_state_wait_busy(node);
975 /* propagate scheduling changes down through dependencies */
976 ret = _afs_wq_dep_propagate(node, next_state);
981 ret = _afs_wq_node_state_wait_busy(node);
987 ((next_state == AFS_WQ_NODE_STATE_DONE) ||
988 (next_state == AFS_WQ_NODE_STATE_ERROR))) {
989 _afs_wq_node_state_change(node, next_state);
990 _afs_wq_node_put_r(node, 1);
992 ret = _afs_wq_node_list_enqueue(ql,
1002 * initialize a struct afs_work_queue_opts to the default values
1004 * @param[out] opts opts struct to initialize
1007 afs_wq_opts_init(struct afs_work_queue_opts *opts)
1009 opts->pend_lothresh = 0;
1010 opts->pend_hithresh = 0;
1014 * set the options for a struct afs_work_queue_opts appropriate for a certain
1015 * number of threads.
1017 * @param[out] opts opts struct in which to set the values
1018 * @param[in] threads number of threads
1021 afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
1023 opts->pend_lothresh = threads * 2;
1024 opts->pend_hithresh = threads * 16;
1027 if (opts->pend_lothresh < 1) {
1028 opts->pend_lothresh = 1;
1030 if (opts->pend_hithresh < 2) {
1031 opts->pend_hithresh = 2;
1036 * allocate and initialize a work queue object.
1038 * @param[out] queue_out address in which to store newly allocated work queue object
1039 * @param[in] rock work queue opaque pointer (passed as first arg to all fired callbacks)
1040 * @param[in] opts options for the new created queue
1042 * @return operation status
1044 * @retval ENOMEM out of memory
1047 afs_wq_create(struct afs_work_queue ** queue_out,
1049 struct afs_work_queue_opts *opts)
1052 struct afs_work_queue * queue;
1054 ret = _afs_wq_alloc(queue_out);
1061 memcpy(&queue->opts, opts, sizeof(queue->opts));
1063 afs_wq_opts_init(&queue->opts);
1066 _afs_wq_node_list_init(&queue->ready_list,
1067 AFS_WQ_NODE_LIST_READY);
1068 _afs_wq_node_list_init(&queue->blocked_list,
1069 AFS_WQ_NODE_LIST_BLOCKED);
1070 _afs_wq_node_list_init(&queue->done_list,
1071 AFS_WQ_NODE_LIST_DONE);
1074 queue->shutdown = 0;
1075 queue->pend_count = 0;
1076 queue->running_count = 0;
1078 MUTEX_INIT(&queue->lock, "queue", MUTEX_DEFAULT, 0);
1079 CV_INIT(&queue->pend_cv, "queue pending", CV_DEFAULT, 0);
1080 CV_INIT(&queue->empty_cv, "queue empty", CV_DEFAULT, 0);
1081 CV_INIT(&queue->running_cv, "queue running", CV_DEFAULT, 0);
1088 * deallocate and free a work queue object.
1090 * @param[in] queue work queue to be destroyed
1092 * @return operation status
1094 * @retval AFS_WQ_ERROR unspecified error
1097 afs_wq_destroy(struct afs_work_queue * queue)
1101 ret = _afs_wq_node_list_destroy(&queue->ready_list);
1106 ret = _afs_wq_node_list_destroy(&queue->blocked_list);
1111 ret = _afs_wq_node_list_destroy(&queue->done_list);
1116 ret = _afs_wq_free(queue);
1123 * shutdown a work queue.
1125 * @param[in] queue work queue object pointer
1127 * @return operation status
1131 afs_wq_shutdown(struct afs_work_queue * queue)
1135 MUTEX_ENTER(&queue->lock);
1136 if (queue->shutdown) {
1137 /* already shutdown, do nothing */
1138 MUTEX_EXIT(&queue->lock);
1141 queue->shutdown = 1;
1143 ret = _afs_wq_node_list_shutdown(&queue->ready_list);
1148 ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
1153 ret = _afs_wq_node_list_shutdown(&queue->done_list);
1158 /* signal everyone that could be waiting, since these conditions will
1159 * generally fail to signal on their own if we're shutdown, since no
1160 * progress is being made */
1161 CV_BROADCAST(&queue->pend_cv);
1162 CV_BROADCAST(&queue->empty_cv);
1163 MUTEX_EXIT(&queue->lock);
1170 * allocate a work node.
1172 * @param[out] node_out address in which to store new work node
1174 * @return operation status
1176 * @retval ENOMEM out of memory
1179 afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
1182 struct afs_work_queue_node * node;
1184 *node_out = node = (struct afs_work_queue_node *) malloc(sizeof(*node));
1190 queue_NodeInit(&node->node_list);
1191 node->qidx = AFS_WQ_NODE_LIST_NONE;
1193 node->rock = node->queue = NULL;
1195 node->block_count = 0;
1196 node->error_count = 0;
1197 MUTEX_INIT(&node->lock, "node", MUTEX_DEFAULT, 0);
1198 CV_INIT(&node->state_cv, "node state", CV_DEFAULT, 0);
1199 node->state = AFS_WQ_NODE_STATE_INIT;
1200 queue_Init(&node->dep_children);
1209 * @param[in] node work node object
1211 * @return operation status
1217 _afs_wq_node_free(struct afs_work_queue_node * node)
1221 if (queue_IsOnQueue(node) ||
1222 (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
1223 (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
1224 (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
1229 ret = _afs_wq_node_free_deps(node);
1234 MUTEX_DESTROY(&node->lock);
1235 CV_DESTROY(&node->state_cv);
1237 if (node->rock_dtor) {
1238 (*node->rock_dtor) (node->rock);
1248 * get a reference to a work node.
1250 * @param[in] node work queue node
1252 * @return operation status
1256 afs_wq_node_get(struct afs_work_queue_node * node)
1258 MUTEX_ENTER(&node->lock);
1260 MUTEX_EXIT(&node->lock);
1266 * put back a reference to a work node.
1268 * @param[in] node work queue node
1269 * @param[in] drop drop node->lock
1271 * @post if refcount reaches zero, node is deallocated.
1273 * @return operation status
1276 * @pre node->lock held
1281 _afs_wq_node_put_r(struct afs_work_queue_node * node,
1286 osi_Assert(node->refcount > 0);
1287 refc = --node->refcount;
1289 MUTEX_EXIT(&node->lock);
1292 osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
1293 _afs_wq_node_free(node);
1300 * put back a reference to a work node.
1302 * @param[in] node work queue node
1304 * @post if refcount reaches zero, node is deallocated.
1306 * @return operation status
1310 afs_wq_node_put(struct afs_work_queue_node * node)
1312 MUTEX_ENTER(&node->lock);
1313 return _afs_wq_node_put_r(node, 1);
1317 * set the callback function on a work node.
1319 * @param[in] node work queue node
1320 * @param[in] cbf callback function
1321 * @param[in] rock opaque pointer passed to callback
1322 * @param[in] rock_dtor destructor function for 'rock', or NULL
1324 * @return operation status
1328 afs_wq_node_set_callback(struct afs_work_queue_node * node,
1329 afs_wq_callback_func_t * cbf,
1330 void * rock, afs_wq_callback_dtor_t *dtor)
1332 MUTEX_ENTER(&node->lock);
1335 node->rock_dtor = dtor;
1336 MUTEX_EXIT(&node->lock);
1344 * @param[in] node work queue node
1346 * @return operation status
1350 afs_wq_node_set_detached(struct afs_work_queue_node * node)
1352 MUTEX_ENTER(&node->lock);
1354 MUTEX_EXIT(&node->lock);
1360 * link a dependency node to a parent and child work node.
1362 * This links a dependency node such that when the 'parent' work node is
1363 * done, the 'child' work node can proceed.
1365 * @param[in] dep dependency node
1366 * @param[in] parent parent node in this dependency
1367 * @param[in] child child node in this dependency
1369 * @return operation status
1373 * - parent->lock held
1374 * - child->lock held
1375 * - parent and child in quiescent state
1380 _afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
1381 struct afs_work_queue_node *parent,
1382 struct afs_work_queue_node *child)
1386 /* Each dep node adds a ref to the child node of that dep. We do not
1387 * do the same for the parent node, since if the only refs remaining
1388 * for a node are deps in node->dep_children, then the node should be
1389 * destroyed, and we will destroy the dep nodes when we free the
1391 ret = _afs_wq_node_get_r(child);
1396 /* add this dep node to the parent node's list of deps */
1397 queue_Append(&parent->dep_children, &dep->parent_list);
1400 dep->parent = parent;
1407 * add a dependency to a work node.
1409 * @param[in] child node which will be dependent upon completion of parent
1410 * @param[in] parent node whose completion gates child's execution
1413 * - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1415 * @return operation status
1419 afs_wq_node_dep_add(struct afs_work_queue_node * child,
1420 struct afs_work_queue_node * parent)
1423 struct afs_work_queue_dep_node * dep = NULL;
1424 struct afs_work_queue_node_multilock ml;
1427 /* self references are bad, mmkay? */
1428 if (parent == child) {
1433 ret = _afs_wq_dep_alloc(&dep);
1438 memset(&ml, 0, sizeof(ml));
1439 ml.nodes[0].node = parent;
1440 ml.nodes[1].node = child;
1441 ret = _afs_wq_node_multilock(&ml);
1447 /* only allow dep modification while in initial state
1448 * or running state (e.g. do a dep add while inside callback) */
1449 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1450 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1455 /* link dep node with child and parent work queue node */
1456 ret = _afs_wq_dep_link_r(dep, parent, child);
1461 /* handle blocking counts */
1462 switch (parent->state) {
1463 case AFS_WQ_NODE_STATE_INIT:
1464 case AFS_WQ_NODE_STATE_SCHEDULED:
1465 case AFS_WQ_NODE_STATE_RUNNING:
1466 case AFS_WQ_NODE_STATE_BLOCKED:
1467 child->block_count++;
1470 case AFS_WQ_NODE_STATE_ERROR:
1471 child->error_count++;
1480 MUTEX_EXIT(&child->lock);
1481 MUTEX_EXIT(&parent->lock);
1487 _afs_wq_dep_free(dep);
1493 * remove a dependency from a work node.
1495 * @param[in] child node which was dependent upon completion of parent
1496 * @param[in] parent node whose completion gated child's execution
1498 * @return operation status
1502 afs_wq_node_dep_del(struct afs_work_queue_node * child,
1503 struct afs_work_queue_node * parent)
1506 struct afs_work_queue_dep_node * dep, * ndep;
1507 struct afs_work_queue_node_multilock ml;
1510 memset(&ml, 0, sizeof(ml));
1511 ml.nodes[0].node = parent;
1512 ml.nodes[1].node = child;
1513 code = _afs_wq_node_multilock(&ml);
1519 /* only permit changes while child is in init state
1520 * or running state (e.g. do a dep del when in callback func) */
1521 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1522 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1527 /* locate node linking parent and child */
1528 for (queue_Scan(&parent->dep_children,
1531 afs_work_queue_dep_node)) {
1532 if ((dep->child == child) &&
1533 (dep->parent == parent)) {
1535 /* no need to grab an extra ref on dep->child here; the caller
1536 * should already have a ref on dep->child */
1537 code = _afs_wq_dep_unlink_r(dep);
1543 code = _afs_wq_dep_free(dep);
1554 MUTEX_EXIT(&child->lock);
1555 MUTEX_EXIT(&parent->lock);
1561 * block a work node from execution.
1563 * this can be used to allow external events to influence work queue flow.
1565 * @param[in] node work queue node to be blocked
1567 * @return operation status
1570 * @post external block count incremented
1573 afs_wq_node_block(struct afs_work_queue_node * node)
1578 MUTEX_ENTER(&node->lock);
1579 ret = _afs_wq_node_state_wait_busy(node);
1584 start = node->block_count++;
1587 (node->qidx == AFS_WQ_NODE_LIST_READY)) {
1588 /* unblocked->blocked transition, and we're already scheduled */
1589 ret = _afs_wq_node_list_remove(node,
1590 AFS_WQ_NODE_STATE_BUSY);
1595 ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
1597 AFS_WQ_NODE_STATE_BLOCKED);
1601 MUTEX_EXIT(&node->lock);
1607 * unblock a work node for execution.
1609 * this can be used to allow external events to influence work queue flow.
1611 * @param[in] node work queue node to be blocked
1613 * @return operation status
1616 * @post external block count decremented
1619 afs_wq_node_unblock(struct afs_work_queue_node * node)
1624 MUTEX_ENTER(&node->lock);
1625 ret = _afs_wq_node_state_wait_busy(node);
1630 end = --node->block_count;
1633 (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
1634 /* blocked->unblock transition, and we're ready to be scheduled */
1635 ret = _afs_wq_node_list_remove(node,
1636 AFS_WQ_NODE_STATE_BUSY);
1641 ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
1643 AFS_WQ_NODE_STATE_SCHEDULED);
1647 MUTEX_EXIT(&node->lock);
1653 * initialize a afs_wq_add_opts struct with the default options.
1655 * @param[out] opts options structure to initialize
1658 afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
1666 * schedule a work node for execution.
1668 * @param[in] queue work queue
1669 * @param[in] node work node
1670 * @param[in] opts options for adding, or NULL for defaults
1672 * @return operation status
1674 * @retval EWOULDBLOCK queue is full and opts specified not to block
1675 * @retval EINTR queue was full, we blocked to add, and the queue was
1676 * shutdown while we were blocking
1679 afs_wq_add(struct afs_work_queue *queue,
1680 struct afs_work_queue_node *node,
1681 struct afs_work_queue_add_opts *opts)
1684 int donate, block, force, hithresh;
1685 struct afs_work_queue_node_list * list;
1686 struct afs_work_queue_add_opts l_opts;
1687 int waited_for_drain = 0;
1688 afs_wq_work_state_t state;
1691 afs_wq_add_opts_init(&l_opts);
1695 donate = opts->donate;
1696 block = opts->block;
1697 force = opts->force;
1700 MUTEX_ENTER(&node->lock);
1702 ret = _afs_wq_node_state_wait_busy(node);
1707 if (!node->block_count && !node->error_count) {
1708 list = &queue->ready_list;
1709 state = AFS_WQ_NODE_STATE_SCHEDULED;
1710 } else if (node->error_count) {
1711 list = &queue->done_list;
1712 state = AFS_WQ_NODE_STATE_ERROR;
1714 list = &queue->blocked_list;
1715 state = AFS_WQ_NODE_STATE_BLOCKED;
1720 MUTEX_ENTER(&queue->lock);
1722 if (queue->shutdown) {
1724 MUTEX_EXIT(&queue->lock);
1725 MUTEX_EXIT(&node->lock);
1729 hithresh = queue->opts.pend_hithresh;
1730 if (hithresh > 0 && queue->pend_count >= hithresh) {
1734 if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
1735 || state == AFS_WQ_NODE_STATE_BLOCKED)) {
1739 MUTEX_EXIT(&node->lock);
1740 CV_WAIT(&queue->pend_cv, &queue->lock);
1742 if (queue->shutdown) {
1745 MUTEX_EXIT(&queue->lock);
1747 waited_for_drain = 1;
1758 queue->pend_count++;
1760 if (waited_for_drain) {
1761 /* signal another thread that may have been waiting for drain */
1762 CV_SIGNAL(&queue->pend_cv);
1765 MUTEX_EXIT(&queue->lock);
1773 node->queue = queue;
1775 ret = _afs_wq_node_list_enqueue(list,
1783 * de-schedule a work node.
1785 * @param[in] node work node
1787 * @return operation status
1791 afs_wq_del(struct afs_work_queue_node * node)
1798 * execute a node on the queue.
1800 * @param[in] queue work queue
1801 * @param[in] rock opaque pointer (passed as third arg to callback func)
1803 * @return operation status
1804 * @retval 0 completed a work unit
1807 afs_wq_do(struct afs_work_queue * queue,
1810 return _afs_wq_do(queue, rock, 1);
1814 * execute a node on the queue, if there is any work to do.
1816 * @param[in] queue work queue
1817 * @param[in] rock opaque pointer (passed as third arg to callback func)
1819 * @return operation status
1820 * @retval 0 completed a work unit
1821 * @retval EWOULDBLOCK there was nothing to do
1824 afs_wq_do_nowait(struct afs_work_queue * queue,
1827 return _afs_wq_do(queue, rock, 0);
1831 * wait for all pending nodes to finish.
1833 * @param[in] queue work queue
1835 * @return operation status
1838 * @post the specified queue was empty at some point; it may not be empty by
1839 * the time this function returns, but at some point after the function was
1840 * called, there were no nodes in the ready queue or blocked queue.
1843 afs_wq_wait_all(struct afs_work_queue *queue)
1847 MUTEX_ENTER(&queue->lock);
1849 while (queue->pend_count > 0 && !queue->shutdown) {
1850 CV_WAIT(&queue->empty_cv, &queue->lock);
1853 if (queue->shutdown) {
1854 /* queue has been shut down, but there may still be some threads
1855 * running e.g. in the middle of their callback. ensure they have
1856 * stopped before we return. */
1857 while (queue->running_count > 0) {
1858 CV_WAIT(&queue->running_cv, &queue->lock);
1865 MUTEX_EXIT(&queue->lock);
1867 /* technically this doesn't really guarantee that the work queue is empty
1868 * after we return, but we do guarantee that it was empty at some point */
1874 * wait for a node to complete; dequeue from done list.
1876 * @param[in] node work queue node
1877 * @param[out] retcode return code from work unit
1879 * @return operation status
1882 * @pre ref held on node
1885 afs_wq_node_wait(struct afs_work_queue_node * node,
1890 MUTEX_ENTER(&node->lock);
1891 if (node->state == AFS_WQ_NODE_STATE_INIT) {
1892 /* not sure what to do in this case */
1896 while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
1897 (node->state != AFS_WQ_NODE_STATE_ERROR)) {
1898 CV_WAIT(&node->state_cv, &node->lock);
1901 *retcode = node->retcode;
1904 if (node->queue == NULL) {
1905 /* nothing we can do */
1909 ret = _afs_wq_node_list_remove(node,
1910 AFS_WQ_NODE_STATE_INIT);
1913 MUTEX_EXIT(&node->lock);