2 * Copyright 2008-2010, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
20 #define __AFS_WORK_QUEUE_IMPL 1
21 #include "work_queue.h"
22 #include "work_queue_impl.h"
25 * public interfaces for work_queue.
29 _afs_wq_node_put_r(struct afs_work_queue_node * node,
33 * allocate a work queue object.
35 * @param[out] queue_out address in which to store queue pointer
37 * @return operation status
39 * @retval ENOMEM out of memory
44 _afs_wq_alloc(struct afs_work_queue ** queue_out)
47 struct afs_work_queue * queue;
49 *queue_out = queue = malloc(sizeof(*queue));
60 * free a work queue object.
62 * @param[in] queue work queue object
64 * @return operation status
70 _afs_wq_free(struct afs_work_queue * queue)
80 * change a node's state.
82 * @param[in] node node object
83 * @param[in] new_state new object state
87 * @pre node->lock held
91 static afs_wq_work_state_t
92 _afs_wq_node_state_change(struct afs_work_queue_node * node,
93 afs_wq_work_state_t new_state)
95 afs_wq_work_state_t old_state;
97 old_state = node->state;
98 node->state = new_state;
100 CV_BROADCAST(&node->state_cv);
106 * wait for a node's state to change from busy to something else.
108 * @param[in] node node object
110 * @return operation status
113 * @pre node->lock held
118 _afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
120 while (node->state == AFS_WQ_NODE_STATE_BUSY) {
121 CV_WAIT(&node->state_cv, &node->lock);
128 * check whether a work queue node is busy.
130 * @param[in] node node object pointer
132 * @return whether node is busy
133 * @retval 1 node is busy
134 * @retval 0 node is not busy
136 * @pre node->lock held
141 _afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
143 return (node->state == AFS_WQ_NODE_STATE_BUSY);
147 * attempt to simultaneously lock two work queue nodes.
149 * this is a somewhat tricky algorithm because there is no
150 * defined hierarchy within the work queue node population.
152 * @param[in] ml multilock control structure
154 * @return operation status
157 * @note in theory, we could easily extend this to
158 * lock more than two nodes
161 * - caller MUST NOT have set busy state on either node
164 * - locks held on both nodes
165 * - both nodes in quiescent states
167 * @note node with non-zero lock_held or busy_held fields
168 * MUST go in array index 0
173 _afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
176 struct timespec delay;
177 int first = 1, second = 0, tmp;
179 /* first pass processing */
180 if (ml->nodes[0].lock_held) {
181 if (!ml->nodes[0].busy_held) {
182 ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
188 code = MUTEX_TRYENTER(&ml->nodes[1].node->lock);
194 /* setup for main loop */
195 MUTEX_EXIT(&ml->nodes[0].node->lock);
199 * setup random exponential backoff
201 * set initial delay to random value in the range [500,1000) ns
204 delay.tv_nsec = 500 + rand() % 500;
207 MUTEX_ENTER(&ml->nodes[first].node->lock);
208 if ((first != 0) || !ml->nodes[0].busy_held) {
209 ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
212 if (!ml->nodes[0].lock_held || first) {
213 MUTEX_EXIT(&ml->nodes[first].node->lock);
214 if (ml->nodes[0].lock_held) {
215 /* on error, return with locks in same state as before call */
216 MUTEX_ENTER(&ml->nodes[0].node->lock);
224 * in order to avoid deadlock, we must use trylock and
225 * a non-blocking state check. if we meet any contention,
226 * we must drop back and start again.
228 code = MUTEX_TRYENTER(&ml->nodes[second].node->lock);
230 if (((second == 0) && (ml->nodes[0].busy_held)) ||
231 !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
235 MUTEX_EXIT(&ml->nodes[second].node->lock);
242 * drop locks, use exponential backoff,
243 * try acquiring in the opposite order
245 MUTEX_EXIT(&ml->nodes[first].node->lock);
246 nanosleep(&delay, NULL);
247 if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
261 * initialize a node list object.
263 * @param[in] list list object
264 * @param[in] id list identifier
266 * @return operation status
272 _afs_wq_node_list_init(struct afs_work_queue_node_list * list,
273 afs_wq_node_list_id_t id)
275 queue_Init(&list->list);
276 MUTEX_INIT(&list->lock, "list", MUTEX_DEFAULT, 0);
277 CV_INIT(&list->cv, "list", CV_DEFAULT, 0);
285 * destroy a node list object.
287 * @param[in] list list object
289 * @return operation status
291 * @retval AFS_WQ_ERROR list not empty
296 _afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
300 if (queue_IsNotEmpty(&list->list)) {
305 MUTEX_DESTROY(&list->lock);
306 CV_DESTROY(&list->cv);
313 * wakeup all threads waiting in dequeue.
315 * @param[in] list list object
317 * @return operation status
323 _afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
326 struct afs_work_queue_node *node, *nnode;
328 MUTEX_ENTER(&list->lock);
331 for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
332 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
334 node->qidx = AFS_WQ_NODE_LIST_NONE;
337 if (node->detached) {
338 /* if we are detached, we hold the reference on the node;
339 * otherwise, it is some other caller that holds the reference.
340 * So don't put the node if we are not detached; the node will
341 * get freed when someone else calls afs_wq_node_put */
342 afs_wq_node_put(node);
346 CV_BROADCAST(&list->cv);
347 MUTEX_EXIT(&list->lock);
353 * append to a node list object.
355 * @param[in] list list object
356 * @param[in] node node object
357 * @param[in] state new node state
359 * @return operation status
361 * @retval AFS_WQ_ERROR raced to enqueue node
365 * - node is not on a list
366 * - node is either not busy, or it is marked as busy by the calling thread
370 * - node lock dropped
375 _afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
376 struct afs_work_queue_node * node,
377 afs_wq_work_state_t state)
381 if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
387 /* deal with lock inversion */
388 code = MUTEX_TRYENTER(&list->lock);
391 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
392 MUTEX_EXIT(&node->lock);
393 MUTEX_ENTER(&list->lock);
394 MUTEX_ENTER(&node->lock);
396 /* assert state of the world (we set busy, so this should never happen) */
397 opr_Assert(queue_IsNotOnQueue(node));
400 if (list->shutdown) {
405 opr_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
406 if (queue_IsEmpty(&list->list)) {
407 /* wakeup a dequeue thread */
408 CV_SIGNAL(&list->cv);
410 queue_Append(&list->list, node);
411 node->qidx = list->qidx;
412 _afs_wq_node_state_change(node, state);
415 MUTEX_EXIT(&node->lock);
416 MUTEX_EXIT(&list->lock);
423 * dequeue a node from a list object.
425 * @param[in] list list object
426 * @param[out] node_out address in which to store node object pointer
427 * @param[in] state new node state
428 * @param[in] block permit blocking on cv if asserted
430 * @return operation status
432 * @retval EWOULDBLOCK block not asserted and nothing to dequeue
433 * @retval EINTR blocking wait interrupted by list shutdown
435 * @post node object returned with node lock held and new state set
440 _afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
441 struct afs_work_queue_node ** node_out,
442 afs_wq_work_state_t state,
446 struct afs_work_queue_node * node;
448 MUTEX_ENTER(&list->lock);
450 if (list->shutdown) {
456 if (!block && queue_IsEmpty(&list->list)) {
462 while (queue_IsEmpty(&list->list)) {
463 if (list->shutdown) {
468 CV_WAIT(&list->cv, &list->lock);
471 *node_out = node = queue_First(&list->list, afs_work_queue_node);
473 MUTEX_ENTER(&node->lock);
475 node->qidx = AFS_WQ_NODE_LIST_NONE;
476 _afs_wq_node_state_change(node, state);
479 MUTEX_EXIT(&list->lock);
485 * remove a node from a list.
487 * @param[in] node node object
488 * @param[in] next_state node state following successful dequeue
490 * @return operation status
492 * @retval AFS_WQ_ERROR in any of the following conditions:
493 * - node not associated with a work queue
494 * - node was not on a linked list (e.g. RUNNING state)
495 * - we raced another thread
497 * @pre node->lock held
499 * @post node removed from node list
501 * @note node->lock may be dropped internally
506 _afs_wq_node_list_remove(struct afs_work_queue_node * node,
507 afs_wq_work_state_t next_state)
510 struct afs_work_queue_node_list * list = NULL;
512 _afs_wq_node_state_wait_busy(node);
518 switch (node->qidx) {
519 case AFS_WQ_NODE_LIST_READY:
520 list = &node->queue->ready_list;
523 case AFS_WQ_NODE_LIST_BLOCKED:
524 list = &node->queue->blocked_list;
527 case AFS_WQ_NODE_LIST_DONE:
528 list = &node->queue->done_list;
536 code = MUTEX_TRYENTER(&list->lock);
539 _afs_wq_node_state_change(node,
540 AFS_WQ_NODE_STATE_BUSY);
541 MUTEX_EXIT(&node->lock);
542 MUTEX_ENTER(&list->lock);
543 MUTEX_ENTER(&node->lock);
545 if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
553 node->qidx = AFS_WQ_NODE_LIST_NONE;
554 _afs_wq_node_state_change(node, next_state);
557 MUTEX_EXIT(&list->lock);
565 * allocate a dependency node.
567 * @param[out] node_out address in which to store dep node pointer
569 * @return operation status
571 * @retval ENOMEM out of memory
576 _afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
579 struct afs_work_queue_dep_node * node;
581 node = malloc(sizeof(*node));
587 queue_NodeInit(&node->parent_list);
588 node->parent = node->child = NULL;
597 * free a dependency node.
599 * @param[in] node dep node pointer
601 * @return operation status
603 * @retval AFS_WQ_ERROR still attached to a work node
608 _afs_wq_dep_free(struct afs_work_queue_dep_node * node)
612 if (queue_IsOnQueue(&node->parent_list) ||
626 * unlink work nodes from a dependency node.
628 * @param[in] dep dependency node
630 * @return operation status
634 * - dep->parent and dep->child are either locked, or are not referenced
636 * - caller holds ref on dep->child
637 * - dep->child and dep->parent in quiescent state
642 _afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
644 struct afs_work_queue_node *child = dep->child;
645 queue_Remove(&dep->parent_list);
649 return _afs_wq_node_put_r(child, 0);
653 * get a reference to a work node.
655 * @param[in] node work queue node
657 * @return operation status
660 * @pre node->lock held
665 _afs_wq_node_get_r(struct afs_work_queue_node * node)
673 * unlink and free all of the dependency nodes from a node.
675 * @param[in] parent work node that is the parent node of all deps to be freed
677 * @return operation status
680 * @pre parent->refcount == 0
683 _afs_wq_node_free_deps(struct afs_work_queue_node *parent)
686 struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
687 struct afs_work_queue_dep_node * dep, * nd;
689 /* unlink and free all of the dep structs attached to 'parent' */
690 for (queue_Scan(&parent->dep_children,
693 afs_work_queue_dep_node)) {
695 MUTEX_ENTER(&dep->child->lock);
696 node_unlock = dep->child;
698 /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
699 * put the last ref on the child, and we need the child to still exist
700 * so we can unlock it */
701 code = _afs_wq_node_get_r(dep->child);
705 node_put = dep->child;
707 /* remember, no need to lock dep->parent, since its refcount is 0 */
708 code = _afs_wq_dep_unlink_r(dep);
712 _afs_wq_node_put_r(node_put, 1);
713 } else if (node_unlock) {
714 MUTEX_EXIT(&node_unlock->lock);
716 node_put = node_unlock = NULL;
719 /* Only do this if everything is okay; if code is nonzero,
720 * something will still be pointing at dep, so don't free it.
721 * We will leak memory, but that's better than memory corruption;
722 * we've done all we can do to try and free the dep memory */
723 code = _afs_wq_dep_free(dep);
734 * propagate state down through dep nodes.
736 * @param[in] parent parent node object
737 * @param[in] next_state next state parent will assume
739 * @return operation status
743 * - parent->lock held
748 _afs_wq_dep_propagate(struct afs_work_queue_node * parent,
749 afs_wq_work_state_t next_state)
752 struct afs_work_queue_dep_node * dep, * nd;
753 struct afs_work_queue_node_multilock ml;
754 afs_wq_work_state_t old_state;
755 afs_wq_node_list_id_t qidx;
756 struct afs_work_queue_node_list * ql;
757 afs_wq_work_state_t cns;
759 old_state = _afs_wq_node_state_change(parent,
760 AFS_WQ_NODE_STATE_BUSY);
761 ml.nodes[0].node = parent;
762 ml.nodes[0].lock_held = 1;
763 ml.nodes[0].busy_held = 1;
765 /* scan through our children updating scheduling state */
766 for (queue_Scan(&parent->dep_children,
769 afs_work_queue_dep_node)) {
770 /* skip half-registered nodes */
771 if (dep->child == NULL) {
775 ml.nodes[1].node = dep->child;
776 ml.nodes[1].lock_held = 0;
777 ml.nodes[1].busy_held = 0;
778 ret = _afs_wq_node_multilock(&ml);
783 switch (next_state) {
784 case AFS_WQ_NODE_STATE_DONE:
785 dep->child->block_count--;
788 case AFS_WQ_NODE_STATE_ERROR:
789 dep->child->error_count++;
796 /* skip unscheduled nodes */
797 if (dep->child->queue == NULL) {
798 MUTEX_EXIT(&dep->child->lock);
803 * when blocked dep and error'd dep counts reach zero, the
804 * node can be scheduled for execution
806 if (dep->child->error_count) {
807 ql = &dep->child->queue->done_list;
808 qidx = AFS_WQ_NODE_LIST_DONE;
809 cns = AFS_WQ_NODE_STATE_ERROR;
810 } else if (dep->child->block_count) {
811 ql = &dep->child->queue->blocked_list;
812 qidx = AFS_WQ_NODE_LIST_BLOCKED;
813 cns = AFS_WQ_NODE_STATE_BLOCKED;
815 ql = &dep->child->queue->ready_list;
816 qidx = AFS_WQ_NODE_LIST_READY;
817 cns = AFS_WQ_NODE_STATE_SCHEDULED;
820 if (qidx != dep->child->qidx) {
821 /* we're transitioning to a different queue */
822 ret = _afs_wq_node_list_remove(dep->child,
823 AFS_WQ_NODE_STATE_BUSY);
825 MUTEX_EXIT(&dep->child->lock);
829 ret = _afs_wq_node_list_enqueue(ql,
833 MUTEX_EXIT(&dep->child->lock);
837 MUTEX_EXIT(&dep->child->lock);
841 _afs_wq_node_state_change(parent,
847 * decrements queue->running_count, and signals waiters if appropriate.
849 * @param[in] queue queue to dec the running count of
852 _afs_wq_dec_running_count(struct afs_work_queue *queue)
854 MUTEX_ENTER(&queue->lock);
855 queue->running_count--;
856 if (queue->shutdown && queue->running_count == 0) {
857 /* if we've shut down, someone may be waiting for the running count
859 CV_BROADCAST(&queue->running_cv);
861 MUTEX_EXIT(&queue->lock);
865 * execute a node on the queue.
867 * @param[in] queue work queue
868 * @param[in] rock opaque pointer (passed as third arg to callback func)
869 * @param[in] block allow blocking in dequeue
871 * @return operation status
872 * @retval 0 completed a work unit
877 _afs_wq_do(struct afs_work_queue * queue,
882 struct afs_work_queue_node * node;
883 afs_wq_callback_func_t * cbf;
884 afs_wq_work_state_t next_state;
885 struct afs_work_queue_node_list * ql;
889 /* We can inc queue->running_count before actually pulling the node off
890 * of the ready_list, since running_count only really matters when we are
891 * shut down. If we get shut down before we pull the node off of
892 * ready_list, but after we inc'd running_count,
893 * _afs_wq_node_list_dequeue should return immediately with EINTR,
894 * in which case we'll dec running_count, so it's as if we never inc'd it
895 * in the first place. */
896 MUTEX_ENTER(&queue->lock);
897 if (queue->shutdown) {
898 MUTEX_EXIT(&queue->lock);
901 queue->running_count++;
902 MUTEX_EXIT(&queue->lock);
904 ret = _afs_wq_node_list_dequeue(&queue->ready_list,
906 AFS_WQ_NODE_STATE_RUNNING,
909 _afs_wq_dec_running_count(queue);
914 node_rock = node->rock;
915 detached = node->detached;
918 MUTEX_EXIT(&node->lock);
919 code = (*cbf)(queue, node, queue->rock, node_rock, rock);
920 MUTEX_ENTER(&node->lock);
922 next_state = AFS_WQ_NODE_STATE_DONE;
923 ql = &queue->done_list;
924 } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
925 if (node->error_count) {
926 next_state = AFS_WQ_NODE_STATE_ERROR;
927 ql = &queue->done_list;
928 } else if (node->block_count) {
929 next_state = AFS_WQ_NODE_STATE_BLOCKED;
930 ql = &queue->blocked_list;
932 next_state = AFS_WQ_NODE_STATE_SCHEDULED;
933 ql = &queue->ready_list;
936 next_state = AFS_WQ_NODE_STATE_ERROR;
937 ql = &queue->done_list;
940 next_state = AFS_WQ_NODE_STATE_DONE;
942 ql = &queue->done_list;
945 _afs_wq_dec_running_count(queue);
947 node->retcode = code;
949 if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
950 (next_state == AFS_WQ_NODE_STATE_ERROR)) {
952 MUTEX_ENTER(&queue->lock);
954 if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
955 /* signal other threads if we're about to below the low
956 * pending-tasks threshold */
958 CV_SIGNAL(&queue->pend_cv);
961 if (queue->pend_count == 1) {
962 /* signal other threads if we're about to become 'empty' */
963 CV_BROADCAST(&queue->empty_cv);
968 MUTEX_EXIT(&queue->lock);
971 ret = _afs_wq_node_state_wait_busy(node);
976 /* propagate scheduling changes down through dependencies */
977 ret = _afs_wq_dep_propagate(node, next_state);
982 ret = _afs_wq_node_state_wait_busy(node);
988 ((next_state == AFS_WQ_NODE_STATE_DONE) ||
989 (next_state == AFS_WQ_NODE_STATE_ERROR))) {
990 _afs_wq_node_state_change(node, next_state);
991 _afs_wq_node_put_r(node, 1);
993 ret = _afs_wq_node_list_enqueue(ql,
1003 * initialize a struct afs_work_queue_opts to the default values
1005 * @param[out] opts opts struct to initialize
1008 afs_wq_opts_init(struct afs_work_queue_opts *opts)
1010 opts->pend_lothresh = 0;
1011 opts->pend_hithresh = 0;
1015 * set the options for a struct afs_work_queue_opts appropriate for a certain
1016 * number of threads.
1018 * @param[out] opts opts struct in which to set the values
1019 * @param[in] threads number of threads
1022 afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
1024 opts->pend_lothresh = threads * 2;
1025 opts->pend_hithresh = threads * 16;
1028 if (opts->pend_lothresh < 1) {
1029 opts->pend_lothresh = 1;
1031 if (opts->pend_hithresh < 2) {
1032 opts->pend_hithresh = 2;
1037 * allocate and initialize a work queue object.
1039 * @param[out] queue_out address in which to store newly allocated work queue object
1040 * @param[in] rock work queue opaque pointer (passed as first arg to all fired callbacks)
1041 * @param[in] opts options for the new created queue
1043 * @return operation status
1045 * @retval ENOMEM out of memory
1048 afs_wq_create(struct afs_work_queue ** queue_out,
1050 struct afs_work_queue_opts *opts)
1053 struct afs_work_queue * queue;
1055 ret = _afs_wq_alloc(queue_out);
1062 memcpy(&queue->opts, opts, sizeof(queue->opts));
1064 afs_wq_opts_init(&queue->opts);
1067 _afs_wq_node_list_init(&queue->ready_list,
1068 AFS_WQ_NODE_LIST_READY);
1069 _afs_wq_node_list_init(&queue->blocked_list,
1070 AFS_WQ_NODE_LIST_BLOCKED);
1071 _afs_wq_node_list_init(&queue->done_list,
1072 AFS_WQ_NODE_LIST_DONE);
1075 queue->shutdown = 0;
1076 queue->pend_count = 0;
1077 queue->running_count = 0;
1079 MUTEX_INIT(&queue->lock, "queue", MUTEX_DEFAULT, 0);
1080 CV_INIT(&queue->pend_cv, "queue pending", CV_DEFAULT, 0);
1081 CV_INIT(&queue->empty_cv, "queue empty", CV_DEFAULT, 0);
1082 CV_INIT(&queue->running_cv, "queue running", CV_DEFAULT, 0);
1089 * deallocate and free a work queue object.
1091 * @param[in] queue work queue to be destroyed
1093 * @return operation status
1095 * @retval AFS_WQ_ERROR unspecified error
1098 afs_wq_destroy(struct afs_work_queue * queue)
1102 ret = _afs_wq_node_list_destroy(&queue->ready_list);
1107 ret = _afs_wq_node_list_destroy(&queue->blocked_list);
1112 ret = _afs_wq_node_list_destroy(&queue->done_list);
1117 ret = _afs_wq_free(queue);
1124 * shutdown a work queue.
1126 * @param[in] queue work queue object pointer
1128 * @return operation status
1132 afs_wq_shutdown(struct afs_work_queue * queue)
1136 MUTEX_ENTER(&queue->lock);
1137 if (queue->shutdown) {
1138 /* already shutdown, do nothing */
1139 MUTEX_EXIT(&queue->lock);
1142 queue->shutdown = 1;
1144 ret = _afs_wq_node_list_shutdown(&queue->ready_list);
1149 ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
1154 ret = _afs_wq_node_list_shutdown(&queue->done_list);
1159 /* signal everyone that could be waiting, since these conditions will
1160 * generally fail to signal on their own if we're shutdown, since no
1161 * progress is being made */
1162 CV_BROADCAST(&queue->pend_cv);
1163 CV_BROADCAST(&queue->empty_cv);
1164 MUTEX_EXIT(&queue->lock);
1171 * allocate a work node.
1173 * @param[out] node_out address in which to store new work node
1175 * @return operation status
1177 * @retval ENOMEM out of memory
1180 afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
1183 struct afs_work_queue_node * node;
1185 *node_out = node = malloc(sizeof(*node));
1191 queue_NodeInit(&node->node_list);
1192 node->qidx = AFS_WQ_NODE_LIST_NONE;
1194 node->rock = node->queue = NULL;
1196 node->block_count = 0;
1197 node->error_count = 0;
1198 MUTEX_INIT(&node->lock, "node", MUTEX_DEFAULT, 0);
1199 CV_INIT(&node->state_cv, "node state", CV_DEFAULT, 0);
1200 node->state = AFS_WQ_NODE_STATE_INIT;
1201 queue_Init(&node->dep_children);
1210 * @param[in] node work node object
1212 * @return operation status
1218 _afs_wq_node_free(struct afs_work_queue_node * node)
1222 if (queue_IsOnQueue(node) ||
1223 (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
1224 (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
1225 (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
1230 ret = _afs_wq_node_free_deps(node);
1235 MUTEX_DESTROY(&node->lock);
1236 CV_DESTROY(&node->state_cv);
1238 if (node->rock_dtor) {
1239 (*node->rock_dtor) (node->rock);
1249 * get a reference to a work node.
1251 * @param[in] node work queue node
1253 * @return operation status
1257 afs_wq_node_get(struct afs_work_queue_node * node)
1259 MUTEX_ENTER(&node->lock);
1261 MUTEX_EXIT(&node->lock);
1267 * put back a reference to a work node.
1269 * @param[in] node work queue node
1270 * @param[in] drop drop node->lock
1272 * @post if refcount reaches zero, node is deallocated.
1274 * @return operation status
1277 * @pre node->lock held
1282 _afs_wq_node_put_r(struct afs_work_queue_node * node,
1287 opr_Assert(node->refcount > 0);
1288 refc = --node->refcount;
1290 MUTEX_EXIT(&node->lock);
1293 opr_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
1294 _afs_wq_node_free(node);
1301 * put back a reference to a work node.
1303 * @param[in] node work queue node
1305 * @post if refcount reaches zero, node is deallocated.
1307 * @return operation status
1311 afs_wq_node_put(struct afs_work_queue_node * node)
1313 MUTEX_ENTER(&node->lock);
1314 return _afs_wq_node_put_r(node, 1);
1318 * set the callback function on a work node.
1320 * @param[in] node work queue node
1321 * @param[in] cbf callback function
1322 * @param[in] rock opaque pointer passed to callback
1323 * @param[in] rock_dtor destructor function for 'rock', or NULL
1325 * @return operation status
1329 afs_wq_node_set_callback(struct afs_work_queue_node * node,
1330 afs_wq_callback_func_t * cbf,
1331 void * rock, afs_wq_callback_dtor_t *dtor)
1333 MUTEX_ENTER(&node->lock);
1336 node->rock_dtor = dtor;
1337 MUTEX_EXIT(&node->lock);
1345 * @param[in] node work queue node
1347 * @return operation status
1351 afs_wq_node_set_detached(struct afs_work_queue_node * node)
1353 MUTEX_ENTER(&node->lock);
1355 MUTEX_EXIT(&node->lock);
1361 * link a dependency node to a parent and child work node.
1363 * This links a dependency node such that when the 'parent' work node is
1364 * done, the 'child' work node can proceed.
1366 * @param[in] dep dependency node
1367 * @param[in] parent parent node in this dependency
1368 * @param[in] child child node in this dependency
1370 * @return operation status
1374 * - parent->lock held
1375 * - child->lock held
1376 * - parent and child in quiescent state
1381 _afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
1382 struct afs_work_queue_node *parent,
1383 struct afs_work_queue_node *child)
1387 /* Each dep node adds a ref to the child node of that dep. We do not
1388 * do the same for the parent node, since if the only refs remaining
1389 * for a node are deps in node->dep_children, then the node should be
1390 * destroyed, and we will destroy the dep nodes when we free the
1392 ret = _afs_wq_node_get_r(child);
1397 /* add this dep node to the parent node's list of deps */
1398 queue_Append(&parent->dep_children, &dep->parent_list);
1401 dep->parent = parent;
1408 * add a dependency to a work node.
1410 * @param[in] child node which will be dependent upon completion of parent
1411 * @param[in] parent node whose completion gates child's execution
1414 * - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1416 * @return operation status
1420 afs_wq_node_dep_add(struct afs_work_queue_node * child,
1421 struct afs_work_queue_node * parent)
1424 struct afs_work_queue_dep_node * dep = NULL;
1425 struct afs_work_queue_node_multilock ml;
1428 /* self references are bad, mmkay? */
1429 if (parent == child) {
1434 ret = _afs_wq_dep_alloc(&dep);
1439 memset(&ml, 0, sizeof(ml));
1440 ml.nodes[0].node = parent;
1441 ml.nodes[1].node = child;
1442 ret = _afs_wq_node_multilock(&ml);
1448 /* only allow dep modification while in initial state
1449 * or running state (e.g. do a dep add while inside callback) */
1450 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1451 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1456 /* link dep node with child and parent work queue node */
1457 ret = _afs_wq_dep_link_r(dep, parent, child);
1462 /* handle blocking counts */
1463 switch (parent->state) {
1464 case AFS_WQ_NODE_STATE_INIT:
1465 case AFS_WQ_NODE_STATE_SCHEDULED:
1466 case AFS_WQ_NODE_STATE_RUNNING:
1467 case AFS_WQ_NODE_STATE_BLOCKED:
1468 child->block_count++;
1471 case AFS_WQ_NODE_STATE_ERROR:
1472 child->error_count++;
1481 MUTEX_EXIT(&child->lock);
1482 MUTEX_EXIT(&parent->lock);
1488 _afs_wq_dep_free(dep);
1494 * remove a dependency from a work node.
1496 * @param[in] child node which was dependent upon completion of parent
1497 * @param[in] parent node whose completion gated child's execution
1499 * @return operation status
1503 afs_wq_node_dep_del(struct afs_work_queue_node * child,
1504 struct afs_work_queue_node * parent)
1507 struct afs_work_queue_dep_node * dep, * ndep;
1508 struct afs_work_queue_node_multilock ml;
1511 memset(&ml, 0, sizeof(ml));
1512 ml.nodes[0].node = parent;
1513 ml.nodes[1].node = child;
1514 code = _afs_wq_node_multilock(&ml);
1520 /* only permit changes while child is in init state
1521 * or running state (e.g. do a dep del when in callback func) */
1522 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1523 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1528 /* locate node linking parent and child */
1529 for (queue_Scan(&parent->dep_children,
1532 afs_work_queue_dep_node)) {
1533 if ((dep->child == child) &&
1534 (dep->parent == parent)) {
1536 /* no need to grab an extra ref on dep->child here; the caller
1537 * should already have a ref on dep->child */
1538 code = _afs_wq_dep_unlink_r(dep);
1544 code = _afs_wq_dep_free(dep);
1555 MUTEX_EXIT(&child->lock);
1556 MUTEX_EXIT(&parent->lock);
1562 * block a work node from execution.
1564 * this can be used to allow external events to influence work queue flow.
1566 * @param[in] node work queue node to be blocked
1568 * @return operation status
1571 * @post external block count incremented
1574 afs_wq_node_block(struct afs_work_queue_node * node)
1579 MUTEX_ENTER(&node->lock);
1580 ret = _afs_wq_node_state_wait_busy(node);
1585 start = node->block_count++;
1588 (node->qidx == AFS_WQ_NODE_LIST_READY)) {
1589 /* unblocked->blocked transition, and we're already scheduled */
1590 ret = _afs_wq_node_list_remove(node,
1591 AFS_WQ_NODE_STATE_BUSY);
1596 ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
1598 AFS_WQ_NODE_STATE_BLOCKED);
1602 MUTEX_EXIT(&node->lock);
1608 * unblock a work node for execution.
1610 * this can be used to allow external events to influence work queue flow.
1612 * @param[in] node work queue node to be blocked
1614 * @return operation status
1617 * @post external block count decremented
1620 afs_wq_node_unblock(struct afs_work_queue_node * node)
1625 MUTEX_ENTER(&node->lock);
1626 ret = _afs_wq_node_state_wait_busy(node);
1631 end = --node->block_count;
1634 (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
1635 /* blocked->unblock transition, and we're ready to be scheduled */
1636 ret = _afs_wq_node_list_remove(node,
1637 AFS_WQ_NODE_STATE_BUSY);
1642 ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
1644 AFS_WQ_NODE_STATE_SCHEDULED);
1648 MUTEX_EXIT(&node->lock);
1654 * initialize a afs_wq_add_opts struct with the default options.
1656 * @param[out] opts options structure to initialize
1659 afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
1667 * schedule a work node for execution.
1669 * @param[in] queue work queue
1670 * @param[in] node work node
1671 * @param[in] opts options for adding, or NULL for defaults
1673 * @return operation status
1675 * @retval EWOULDBLOCK queue is full and opts specified not to block
1676 * @retval EINTR queue was full, we blocked to add, and the queue was
1677 * shutdown while we were blocking
1680 afs_wq_add(struct afs_work_queue *queue,
1681 struct afs_work_queue_node *node,
1682 struct afs_work_queue_add_opts *opts)
1685 int donate, block, force, hithresh;
1686 struct afs_work_queue_node_list * list;
1687 struct afs_work_queue_add_opts l_opts;
1688 int waited_for_drain = 0;
1689 afs_wq_work_state_t state;
1692 afs_wq_add_opts_init(&l_opts);
1696 donate = opts->donate;
1697 block = opts->block;
1698 force = opts->force;
1701 MUTEX_ENTER(&node->lock);
1703 ret = _afs_wq_node_state_wait_busy(node);
1708 if (!node->block_count && !node->error_count) {
1709 list = &queue->ready_list;
1710 state = AFS_WQ_NODE_STATE_SCHEDULED;
1711 } else if (node->error_count) {
1712 list = &queue->done_list;
1713 state = AFS_WQ_NODE_STATE_ERROR;
1715 list = &queue->blocked_list;
1716 state = AFS_WQ_NODE_STATE_BLOCKED;
1721 MUTEX_ENTER(&queue->lock);
1723 if (queue->shutdown) {
1725 MUTEX_EXIT(&queue->lock);
1726 MUTEX_EXIT(&node->lock);
1730 hithresh = queue->opts.pend_hithresh;
1731 if (hithresh > 0 && queue->pend_count >= hithresh) {
1735 if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
1736 || state == AFS_WQ_NODE_STATE_BLOCKED)) {
1740 MUTEX_EXIT(&node->lock);
1741 CV_WAIT(&queue->pend_cv, &queue->lock);
1743 if (queue->shutdown) {
1746 MUTEX_EXIT(&queue->lock);
1748 waited_for_drain = 1;
1759 queue->pend_count++;
1761 if (waited_for_drain) {
1762 /* signal another thread that may have been waiting for drain */
1763 CV_SIGNAL(&queue->pend_cv);
1766 MUTEX_EXIT(&queue->lock);
1774 node->queue = queue;
1776 ret = _afs_wq_node_list_enqueue(list,
1784 * de-schedule a work node.
1786 * @param[in] node work node
1788 * @return operation status
1792 afs_wq_del(struct afs_work_queue_node * node)
1799 * execute a node on the queue.
1801 * @param[in] queue work queue
1802 * @param[in] rock opaque pointer (passed as third arg to callback func)
1804 * @return operation status
1805 * @retval 0 completed a work unit
1808 afs_wq_do(struct afs_work_queue * queue,
1811 return _afs_wq_do(queue, rock, 1);
1815 * execute a node on the queue, if there is any work to do.
1817 * @param[in] queue work queue
1818 * @param[in] rock opaque pointer (passed as third arg to callback func)
1820 * @return operation status
1821 * @retval 0 completed a work unit
1822 * @retval EWOULDBLOCK there was nothing to do
1825 afs_wq_do_nowait(struct afs_work_queue * queue,
1828 return _afs_wq_do(queue, rock, 0);
1832 * wait for all pending nodes to finish.
1834 * @param[in] queue work queue
1836 * @return operation status
1839 * @post the specified queue was empty at some point; it may not be empty by
1840 * the time this function returns, but at some point after the function was
1841 * called, there were no nodes in the ready queue or blocked queue.
1844 afs_wq_wait_all(struct afs_work_queue *queue)
1848 MUTEX_ENTER(&queue->lock);
1850 while (queue->pend_count > 0 && !queue->shutdown) {
1851 CV_WAIT(&queue->empty_cv, &queue->lock);
1854 if (queue->shutdown) {
1855 /* queue has been shut down, but there may still be some threads
1856 * running e.g. in the middle of their callback. ensure they have
1857 * stopped before we return. */
1858 while (queue->running_count > 0) {
1859 CV_WAIT(&queue->running_cv, &queue->lock);
1866 MUTEX_EXIT(&queue->lock);
1868 /* technically this doesn't really guarantee that the work queue is empty
1869 * after we return, but we do guarantee that it was empty at some point */
1875 * wait for a node to complete; dequeue from done list.
1877 * @param[in] node work queue node
1878 * @param[out] retcode return code from work unit
1880 * @return operation status
1883 * @pre ref held on node
1886 afs_wq_node_wait(struct afs_work_queue_node * node,
1891 MUTEX_ENTER(&node->lock);
1892 if (node->state == AFS_WQ_NODE_STATE_INIT) {
1893 /* not sure what to do in this case */
1897 while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
1898 (node->state != AFS_WQ_NODE_STATE_ERROR)) {
1899 CV_WAIT(&node->state_cv, &node->lock);
1902 *retcode = node->retcode;
1905 if (node->queue == NULL) {
1906 /* nothing we can do */
1910 ret = _afs_wq_node_list_remove(node,
1911 AFS_WQ_NODE_STATE_INIT);
1914 MUTEX_EXIT(&node->lock);