e2f1553a283eacc670a38b5cd38b3556fec46279
[openafs.git] / src / util / work_queue.c
1 /*
2  * Copyright 2008-2010, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <sys/file.h>
15
16 #include <lock.h>
17
18 #define __AFS_WORK_QUEUE_IMPL 1
19 #include "work_queue.h"
20 #include "work_queue_impl.h"
21
22 /**
23  * public interfaces for work_queue.
24  */
25
26 static int
27 _afs_wq_node_put_r(struct afs_work_queue_node * node,
28                    int drop);
29
30 /**
31  * allocate a work queue object.
32  *
33  * @param[out] queue_out address in which to store queue pointer
34  *
35  * @return operation status
36  *    @retval 0 success
37  *    @retval ENOMEM out of memory
38  *
39  * @internal
40  */
41 static int
42 _afs_wq_alloc(struct afs_work_queue ** queue_out)
43 {
44     int ret = 0;
45     struct afs_work_queue * queue;
46
47     *queue_out = queue = malloc(sizeof(*queue));
48     if (queue == NULL) {
49         ret = ENOMEM;
50         goto error;
51     }
52
53  error:
54     return ret;
55 }
56
57 /**
58  * free a work queue object.
59  *
60  * @param[in] queue  work queue object
61  *
62  * @return operation status
63  *    @retval 0 success
64  *
65  * @internal
66  */
67 static int
68 _afs_wq_free(struct afs_work_queue * queue)
69 {
70     int ret = 0;
71
72     free(queue);
73
74     return ret;
75 }
76
77 /**
78  * change a node's state.
79  *
80  * @param[in] node       node object
81  * @param[in] new_state  new object state
82  *
83  * @return old state
84  *
85  * @pre node->lock held
86  *
87  * @internal
88  */
89 static afs_wq_work_state_t
90 _afs_wq_node_state_change(struct afs_work_queue_node * node,
91                           afs_wq_work_state_t new_state)
92 {
93     afs_wq_work_state_t old_state;
94
95     old_state = node->state;
96     node->state = new_state;
97
98     CV_BROADCAST(&node->state_cv);
99
100     return old_state;
101 }
102
103 /**
104  * wait for a node's state to change from busy to something else.
105  *
106  * @param[in] node  node object
107  *
108  * @return operation status
109  *    @retval 0 success
110  *
111  * @pre node->lock held
112  *
113  * @internal
114  */
115 static int
116 _afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
117 {
118     while (node->state == AFS_WQ_NODE_STATE_BUSY) {
119         CV_WAIT(&node->state_cv, &node->lock);
120     }
121
122     return 0;
123 }
124
125 /**
126  * check whether a work queue node is busy.
127  *
128  * @param[in] node  node object pointer
129  *
130  * @return whether node is busy
131  *    @retval 1 node is busy
132  *    @retval 0 node is not busy
133  *
134  * @pre node->lock held
135  *
136  * @internal
137  */
138 static int
139 _afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
140 {
141     return (node->state == AFS_WQ_NODE_STATE_BUSY);
142 }
143
144 /**
145  * attempt to simultaneously lock two work queue nodes.
146  *
147  * this is a somewhat tricky algorithm because there is no
148  * defined hierarchy within the work queue node population.
149  *
150  * @param[in] ml  multilock control structure
151  *
152  * @return operation status
153  *    @retval 0
154  *
155  * @note in theory, we could easily extend this to
156  *       lock more than two nodes
157  *
158  * @pre
159  *   - caller MUST NOT have set busy state on either node
160  *
161  * @post
162  *   - locks held on both nodes
163  *   - both nodes in quiescent states
164  *
165  * @note node with non-zero lock_held or busy_held fields
166  *       MUST go in array index 0
167  *
168  * @internal
169  */
170 static int
171 _afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
172 {
173     int code, ret = 0;
174     struct timespec delay;
175     int first = 1, second = 0, tmp;
176
177     /* first pass processing */
178     if (ml->nodes[0].lock_held) {
179         if (!ml->nodes[0].busy_held) {
180             ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
181             if (ret) {
182                 goto error;
183             }
184         }
185
186         code = MUTEX_TRYENTER(&ml->nodes[1].node->lock);
187         if (code) {
188             /* success */
189             goto done;
190         }
191
192         /* setup for main loop */
193         MUTEX_EXIT(&ml->nodes[0].node->lock);
194     }
195
196     /*
197      * setup random exponential backoff
198      *
199      * set initial delay to random value in the range [500,1000) ns
200      */
201     delay.tv_sec = 0;
202     delay.tv_nsec = 500 + rand() % 500;
203
204     while (1) {
205         MUTEX_ENTER(&ml->nodes[first].node->lock);
206         if ((first != 0) || !ml->nodes[0].busy_held) {
207             ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
208             if (ret) {
209                 /* cleanup */
210                 if (!ml->nodes[0].lock_held || first) {
211                     MUTEX_EXIT(&ml->nodes[first].node->lock);
212                     if (ml->nodes[0].lock_held) {
213                         /* on error, return with locks in same state as before call */
214                         MUTEX_ENTER(&ml->nodes[0].node->lock);
215                     }
216                 }
217                 goto error;
218             }
219         }
220
221         /*
222          * in order to avoid deadlock, we must use trylock and
223          * a non-blocking state check.  if we meet any contention,
224          * we must drop back and start again.
225          */
226         code = MUTEX_TRYENTER(&ml->nodes[second].node->lock);
227         if (code) {
228             if (((second == 0) && (ml->nodes[0].busy_held)) ||
229                 !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
230                 /* success */
231                 break;
232             } else {
233                 MUTEX_EXIT(&ml->nodes[second].node->lock);
234             }
235         }
236
237         /*
238          * contended.
239          *
240          * drop locks, use exponential backoff,
241          * try acquiring in the opposite order
242          */
243         MUTEX_EXIT(&ml->nodes[first].node->lock);
244         nanosleep(&delay, NULL);
245         if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
246             delay.tv_nsec <<= 1;
247         }
248         tmp = second;
249         second = first;
250         first = tmp;
251     }
252
253  done:
254  error:
255     return ret;
256 }
257
258 /**
259  * initialize a node list object.
260  *
261  * @param[in] list list object
262  * @param[in] id   list identifier
263  *
264  * @return operation status
265  *    @retval 0 success
266  *
267  * @internal
268  */
269 static int
270 _afs_wq_node_list_init(struct afs_work_queue_node_list * list,
271                        afs_wq_node_list_id_t id)
272 {
273     queue_Init(&list->list);
274     MUTEX_INIT(&list->lock, "list", MUTEX_DEFAULT, 0);
275     CV_INIT(&list->cv, "list", CV_DEFAULT, 0);
276     list->qidx = id;
277     list->shutdown = 0;
278
279     return 0;
280 }
281
282 /**
283  * destroy a node list object.
284  *
285  * @param[in] list list object
286  *
287  * @return operation status
288  *    @retval 0 success
289  *    @retval AFS_WQ_ERROR list not empty
290  *
291  * @internal
292  */
293 static int
294 _afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
295 {
296     int ret = 0;
297
298     if (queue_IsNotEmpty(&list->list)) {
299         ret = AFS_WQ_ERROR;
300         goto error;
301     }
302
303     MUTEX_DESTROY(&list->lock);
304     CV_DESTROY(&list->cv);
305
306  error:
307     return ret;
308 }
309
310 /**
311  * wakeup all threads waiting in dequeue.
312  *
313  * @param[in] list list object
314  *
315  * @return operation status
316  *    @retval 0 success
317  *
318  * @internal
319  */
320 static int
321 _afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
322 {
323     int ret = 0;
324     struct afs_work_queue_node *node, *nnode;
325
326     MUTEX_ENTER(&list->lock);
327     list->shutdown = 1;
328
329     for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
330         _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
331         queue_Remove(node);
332         node->qidx = AFS_WQ_NODE_LIST_NONE;
333         node->queue = NULL;
334
335         if (node->detached) {
336             /* if we are detached, we hold the reference on the node;
337              * otherwise, it is some other caller that holds the reference.
338              * So don't put the node if we are not detached; the node will
339              * get freed when someone else calls afs_wq_node_put */
340             afs_wq_node_put(node);
341         }
342     }
343
344     CV_BROADCAST(&list->cv);
345     MUTEX_EXIT(&list->lock);
346
347     return ret;
348 }
349
350 /**
351  * append to a node list object.
352  *
353  * @param[in] list  list object
354  * @param[in] node  node object
355  * @param[in] state new node state
356  *
357  * @return operation status
358  *    @retval 0 success
359  *    @retval AFS_WQ_ERROR raced to enqueue node
360  *
361  * @pre
362  *   - node lock held
363  *   - node is not on a list
364  *   - node is either not busy, or it is marked as busy by the calling thread
365  *
366  * @post
367  *   - enqueued on list
368  *   - node lock dropped
369  *
370  * @internal
371  */
372 static int
373 _afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
374                           struct afs_work_queue_node * node,
375                           afs_wq_work_state_t state)
376 {
377     int code, ret = 0;
378
379     if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
380         /* raced */
381         ret = AFS_WQ_ERROR;
382         goto error;
383     }
384
385     /* deal with lock inversion */
386     code = MUTEX_TRYENTER(&list->lock);
387     if (!code) {
388         /* contended */
389         _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
390         MUTEX_EXIT(&node->lock);
391         MUTEX_ENTER(&list->lock);
392         MUTEX_ENTER(&node->lock);
393
394         /* assert state of the world (we set busy, so this should never happen) */
395         osi_Assert(queue_IsNotOnQueue(node));
396     }
397
398     if (list->shutdown) {
399         ret = AFS_WQ_ERROR;
400         goto error_unlock;
401     }
402
403     osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
404     if (queue_IsEmpty(&list->list)) {
405         /* wakeup a dequeue thread */
406         CV_SIGNAL(&list->cv);
407     }
408     queue_Append(&list->list, node);
409     node->qidx = list->qidx;
410     _afs_wq_node_state_change(node, state);
411
412  error_unlock:
413     MUTEX_EXIT(&node->lock);
414     MUTEX_EXIT(&list->lock);
415
416  error:
417     return ret;
418 }
419
420 /**
421  * dequeue a node from a list object.
422  *
423  * @param[in]    list      list object
424  * @param[out]   node_out  address in which to store node object pointer
425  * @param[in]    state     new node state
426  * @param[in]    block     permit blocking on cv if asserted
427  *
428  * @return operation status
429  *    @retval 0 success
430  *    @retval EWOULDBLOCK block not asserted and nothing to dequeue
431  *    @retval EINTR blocking wait interrupted by list shutdown
432  *
433  * @post node object returned with node lock held and new state set
434  *
435  * @internal
436  */
437 static int
438 _afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
439                           struct afs_work_queue_node ** node_out,
440                           afs_wq_work_state_t state,
441                           int block)
442 {
443     int ret = 0;
444     struct afs_work_queue_node * node;
445
446     MUTEX_ENTER(&list->lock);
447
448     if (list->shutdown) {
449         *node_out = NULL;
450         ret = EINTR;
451         goto done_sync;
452     }
453
454     if (!block && queue_IsEmpty(&list->list)) {
455         *node_out = NULL;
456         ret = EWOULDBLOCK;
457         goto done_sync;
458     }
459
460     while (queue_IsEmpty(&list->list)) {
461         if (list->shutdown) {
462             *node_out = NULL;
463             ret = EINTR;
464             goto done_sync;
465         }
466         CV_WAIT(&list->cv, &list->lock);
467     }
468
469     *node_out = node = queue_First(&list->list, afs_work_queue_node);
470
471     MUTEX_ENTER(&node->lock);
472     queue_Remove(node);
473     node->qidx = AFS_WQ_NODE_LIST_NONE;
474     _afs_wq_node_state_change(node, state);
475
476  done_sync:
477     MUTEX_EXIT(&list->lock);
478
479     return ret;
480 }
481
482 /**
483  * remove a node from a list.
484  *
485  * @param[in] node        node object
486  * @param[in] next_state  node state following successful dequeue
487  *
488  * @return operation status
489  *    @retval 0 success
490  *    @retval AFS_WQ_ERROR in any of the following conditions:
491  *              - node not associated with a work queue
492  *              - node was not on a linked list (e.g. RUNNING state)
493  *              - we raced another thread
494  *
495  * @pre node->lock held
496  *
497  * @post node removed from node list
498  *
499  * @note node->lock may be dropped internally
500  *
501  * @internal
502  */
503 static int
504 _afs_wq_node_list_remove(struct afs_work_queue_node * node,
505                          afs_wq_work_state_t next_state)
506 {
507     int code, ret = 0;
508     struct afs_work_queue_node_list * list = NULL;
509
510     _afs_wq_node_state_wait_busy(node);
511
512     if (!node->queue) {
513         ret = AFS_WQ_ERROR;
514         goto error;
515     }
516     switch (node->qidx) {
517     case AFS_WQ_NODE_LIST_READY:
518         list = &node->queue->ready_list;
519         break;
520
521     case AFS_WQ_NODE_LIST_BLOCKED:
522         list = &node->queue->blocked_list;
523         break;
524
525     case AFS_WQ_NODE_LIST_DONE:
526         list = &node->queue->done_list;
527         break;
528
529     default:
530         ret = AFS_WQ_ERROR;
531     }
532
533     if (list) {
534         code = MUTEX_TRYENTER(&list->lock);
535         if (!code) {
536             /* contended */
537             _afs_wq_node_state_change(node,
538                                            AFS_WQ_NODE_STATE_BUSY);
539             MUTEX_EXIT(&node->lock);
540             MUTEX_ENTER(&list->lock);
541             MUTEX_ENTER(&node->lock);
542
543             if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
544                 /* raced */
545                 ret= AFS_WQ_ERROR;
546                 goto done_sync;
547             }
548         }
549
550         queue_Remove(node);
551         node->qidx = AFS_WQ_NODE_LIST_NONE;
552         _afs_wq_node_state_change(node, next_state);
553
554     done_sync:
555         MUTEX_EXIT(&list->lock);
556     }
557
558  error:
559     return ret;
560 }
561
562 /**
563  * allocate a dependency node.
564  *
565  * @param[out] node_out address in which to store dep node pointer
566  *
567  * @return operation status
568  *    @retval 0 success
569  *    @retval ENOMEM   out of memory
570  *
571  * @internal
572  */
573 static int
574 _afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
575 {
576     int ret = 0;
577     struct afs_work_queue_dep_node * node;
578
579     node = malloc(sizeof(*node));
580     if (node == NULL) {
581         ret = ENOMEM;
582         goto error;
583     }
584
585     queue_NodeInit(&node->parent_list);
586     node->parent = node->child = NULL;
587
588     *node_out = node;
589
590  error:
591     return ret;
592 }
593
594 /**
595  * free a dependency node.
596  *
597  * @param[in] node dep node pointer
598  *
599  * @return operation status
600  *    @retval 0 success
601  *    @retval AFS_WQ_ERROR still attached to a work node
602  *
603  * @internal
604  */
605 static int
606 _afs_wq_dep_free(struct afs_work_queue_dep_node * node)
607 {
608     int ret = 0;
609
610     if (queue_IsOnQueue(&node->parent_list) ||
611         node->parent ||
612         node->child) {
613         ret = AFS_WQ_ERROR;
614         goto error;
615     }
616
617     free(node);
618
619  error:
620     return ret;
621 }
622
623 /**
624  * unlink work nodes from a dependency node.
625  *
626  * @param[in]  dep      dependency node
627  *
628  * @return operation status
629  *    @retval 0 success
630  *
631  * @pre
632  *   - dep->parent and dep->child are either locked, or are not referenced
633  *     by anything else
634  *   - caller holds ref on dep->child
635  *   - dep->child and dep->parent in quiescent state
636  *
637  * @internal
638  */
639 static int
640 _afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
641 {
642     struct afs_work_queue_node *child = dep->child;
643     queue_Remove(&dep->parent_list);
644     dep->child = NULL;
645     dep->parent = NULL;
646
647     return _afs_wq_node_put_r(child, 0);
648 }
649
650 /**
651  * get a reference to a work node.
652  *
653  * @param[in] node  work queue node
654  *
655  * @return operation status
656  *    @retval 0 success
657  *
658  * @pre node->lock held
659  *
660  * @internal
661  */
662 static int
663 _afs_wq_node_get_r(struct afs_work_queue_node * node)
664 {
665     node->refcount++;
666
667     return 0;
668 }
669
670 /**
671  * unlink and free all of the dependency nodes from a node.
672  *
673  * @param[in] parent  work node that is the parent node of all deps to be freed
674  *
675  * @return operation status
676  *  @retval 0 success
677  *
678  * @pre parent->refcount == 0
679  */
680 static int
681 _afs_wq_node_free_deps(struct afs_work_queue_node *parent)
682 {
683     int ret = 0, code;
684     struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
685     struct afs_work_queue_dep_node * dep, * nd;
686
687     /* unlink and free all of the dep structs attached to 'parent' */
688     for (queue_Scan(&parent->dep_children,
689                     dep,
690                     nd,
691                     afs_work_queue_dep_node)) {
692
693         MUTEX_ENTER(&dep->child->lock);
694         node_unlock = dep->child;
695
696         /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
697          * put the last ref on the child, and we need the child to still exist
698          * so we can unlock it */
699         code = _afs_wq_node_get_r(dep->child);
700         if (code) {
701             goto loop_error;
702         }
703         node_put = dep->child;
704
705         /* remember, no need to lock dep->parent, since its refcount is 0 */
706         code = _afs_wq_dep_unlink_r(dep);
707
708      loop_error:
709         if (node_put) {
710             _afs_wq_node_put_r(node_put, 1);
711         } else if (node_unlock) {
712             MUTEX_EXIT(&node_unlock->lock);
713         }
714         node_put = node_unlock = NULL;
715
716         if (code == 0) {
717             /* Only do this if everything is okay; if code is nonzero,
718              * something will still be pointing at dep, so don't free it.
719              * We will leak memory, but that's better than memory corruption;
720              * we've done all we can do to try and free the dep memory */
721             code = _afs_wq_dep_free(dep);
722         }
723
724         if (!ret) {
725             ret = code;
726         }
727     }
728     return ret;
729 }
730
731 /**
732  * propagate state down through dep nodes.
733  *
734  * @param[in] parent      parent node object
735  * @param[in] next_state  next state parent will assume
736  *
737  * @return operation status
738  *    @retval 0 success
739  *
740  * @pre
741  *   - parent->lock held
742  *
743  * @internal
744  */
745 static int
746 _afs_wq_dep_propagate(struct afs_work_queue_node * parent,
747                       afs_wq_work_state_t next_state)
748 {
749     int ret = 0;
750     struct afs_work_queue_dep_node * dep, * nd;
751     struct afs_work_queue_node_multilock ml;
752     afs_wq_work_state_t old_state;
753     afs_wq_node_list_id_t qidx;
754     struct afs_work_queue_node_list * ql;
755     afs_wq_work_state_t cns;
756
757     old_state = _afs_wq_node_state_change(parent,
758                                                AFS_WQ_NODE_STATE_BUSY);
759     ml.nodes[0].node = parent;
760     ml.nodes[0].lock_held = 1;
761     ml.nodes[0].busy_held = 1;
762
763     /* scan through our children updating scheduling state */
764     for (queue_Scan(&parent->dep_children,
765                     dep,
766                     nd,
767                     afs_work_queue_dep_node)) {
768         /* skip half-registered nodes */
769         if (dep->child == NULL) {
770             continue;
771         }
772
773         ml.nodes[1].node = dep->child;
774         ml.nodes[1].lock_held = 0;
775         ml.nodes[1].busy_held = 0;
776         ret = _afs_wq_node_multilock(&ml);
777         if (ret) {
778             goto error;
779         }
780
781         switch (next_state) {
782         case AFS_WQ_NODE_STATE_DONE:
783             dep->child->block_count--;
784             break;
785
786         case AFS_WQ_NODE_STATE_ERROR:
787             dep->child->error_count++;
788             break;
789
790         default:
791             (void)0; /* nop */
792         }
793
794         /* skip unscheduled nodes */
795         if (dep->child->queue == NULL) {
796             MUTEX_EXIT(&dep->child->lock);
797             continue;
798         }
799
800         /*
801          * when blocked dep and error'd dep counts reach zero, the
802          * node can be scheduled for execution
803          */
804         if (dep->child->error_count) {
805             ql = &dep->child->queue->done_list;
806             qidx = AFS_WQ_NODE_LIST_DONE;
807             cns = AFS_WQ_NODE_STATE_ERROR;
808         } else if (dep->child->block_count) {
809             ql = &dep->child->queue->blocked_list;
810             qidx = AFS_WQ_NODE_LIST_BLOCKED;
811             cns = AFS_WQ_NODE_STATE_BLOCKED;
812         } else {
813             ql = &dep->child->queue->ready_list;
814             qidx = AFS_WQ_NODE_LIST_READY;
815             cns = AFS_WQ_NODE_STATE_SCHEDULED;
816         }
817
818         if (qidx != dep->child->qidx) {
819             /* we're transitioning to a different queue */
820             ret = _afs_wq_node_list_remove(dep->child,
821                                                 AFS_WQ_NODE_STATE_BUSY);
822             if (ret) {
823                 MUTEX_EXIT(&dep->child->lock);
824                 goto error;
825             }
826
827             ret = _afs_wq_node_list_enqueue(ql,
828                                                  dep->child,
829                                                  cns);
830             if (ret) {
831                 MUTEX_EXIT(&dep->child->lock);
832                 goto error;
833             }
834         }
835         MUTEX_EXIT(&dep->child->lock);
836     }
837
838  error:
839     _afs_wq_node_state_change(parent,
840                                    old_state);
841     return ret;
842 }
843
844 /**
845  * decrements queue->running_count, and signals waiters if appropriate.
846  *
847  * @param[in] queue  queue to dec the running count of
848  */
849 static void
850 _afs_wq_dec_running_count(struct afs_work_queue *queue)
851 {
852     MUTEX_ENTER(&queue->lock);
853     queue->running_count--;
854     if (queue->shutdown && queue->running_count == 0) {
855         /* if we've shut down, someone may be waiting for the running count
856          * to drop to 0 */
857         CV_BROADCAST(&queue->running_cv);
858     }
859     MUTEX_EXIT(&queue->lock);
860 }
861
862 /**
863  * execute a node on the queue.
864  *
865  * @param[in] queue  work queue
866  * @param[in] rock   opaque pointer (passed as third arg to callback func)
867  * @param[in] block  allow blocking in dequeue
868  *
869  * @return operation status
870  *    @retval 0 completed a work unit
871  *
872  * @internal
873  */
874 static int
875 _afs_wq_do(struct afs_work_queue * queue,
876            void * rock,
877            int block)
878 {
879     int code, ret = 0;
880     struct afs_work_queue_node * node;
881     afs_wq_callback_func_t * cbf;
882     afs_wq_work_state_t next_state;
883     struct afs_work_queue_node_list * ql;
884     void * node_rock;
885     int detached = 0;
886
887     /* We can inc queue->running_count before actually pulling the node off
888      * of the ready_list, since running_count only really matters when we are
889      * shut down. If we get shut down before we pull the node off of
890      * ready_list, but after we inc'd running_count,
891      * _afs_wq_node_list_dequeue should return immediately with EINTR,
892      * in which case we'll dec running_count, so it's as if we never inc'd it
893      * in the first place. */
894     MUTEX_ENTER(&queue->lock);
895     if (queue->shutdown) {
896         MUTEX_EXIT(&queue->lock);
897         return EINTR;
898     }
899     queue->running_count++;
900     MUTEX_EXIT(&queue->lock);
901
902     ret = _afs_wq_node_list_dequeue(&queue->ready_list,
903                                          &node,
904                                          AFS_WQ_NODE_STATE_RUNNING,
905                                          block);
906     if (ret) {
907         _afs_wq_dec_running_count(queue);
908         goto error;
909     }
910
911     cbf = node->cbf;
912     node_rock = node->rock;
913     detached = node->detached;
914
915     if (cbf != NULL) {
916         MUTEX_EXIT(&node->lock);
917         code = (*cbf)(queue, node, queue->rock, node_rock, rock);
918         MUTEX_ENTER(&node->lock);
919         if (code == 0) {
920             next_state = AFS_WQ_NODE_STATE_DONE;
921             ql = &queue->done_list;
922         } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
923             if (node->error_count) {
924                 next_state = AFS_WQ_NODE_STATE_ERROR;
925                 ql = &queue->done_list;
926             } else if (node->block_count) {
927                 next_state = AFS_WQ_NODE_STATE_BLOCKED;
928                 ql = &queue->blocked_list;
929             } else {
930                 next_state = AFS_WQ_NODE_STATE_SCHEDULED;
931                 ql = &queue->ready_list;
932             }
933         } else {
934             next_state = AFS_WQ_NODE_STATE_ERROR;
935             ql = &queue->done_list;
936         }
937     } else {
938         next_state = AFS_WQ_NODE_STATE_DONE;
939         code = 0;
940         ql = &queue->done_list;
941     }
942
943     _afs_wq_dec_running_count(queue);
944
945     node->retcode = code;
946
947     if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
948         (next_state == AFS_WQ_NODE_STATE_ERROR)) {
949
950         MUTEX_ENTER(&queue->lock);
951
952         if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
953             /* signal other threads if we're about to below the low
954              * pending-tasks threshold */
955             queue->drain = 0;
956             CV_SIGNAL(&queue->pend_cv);
957         }
958
959         if (queue->pend_count == 1) {
960             /* signal other threads if we're about to become 'empty' */
961             CV_BROADCAST(&queue->empty_cv);
962         }
963
964         queue->pend_count--;
965
966         MUTEX_EXIT(&queue->lock);
967     }
968
969     ret = _afs_wq_node_state_wait_busy(node);
970     if (ret) {
971         goto error;
972     }
973
974     /* propagate scheduling changes down through dependencies */
975     ret = _afs_wq_dep_propagate(node, next_state);
976     if (ret) {
977         goto error;
978     }
979
980     ret = _afs_wq_node_state_wait_busy(node);
981     if (ret) {
982         goto error;
983     }
984
985     if (detached &&
986         ((next_state == AFS_WQ_NODE_STATE_DONE) ||
987          (next_state == AFS_WQ_NODE_STATE_ERROR))) {
988         _afs_wq_node_state_change(node, next_state);
989         _afs_wq_node_put_r(node, 1);
990     } else {
991         ret = _afs_wq_node_list_enqueue(ql,
992                                              node,
993                                              next_state);
994     }
995
996  error:
997     return ret;
998 }
999
1000 /**
1001  * initialize a struct afs_work_queue_opts to the default values
1002  *
1003  * @param[out] opts  opts struct to initialize
1004  */
1005 void
1006 afs_wq_opts_init(struct afs_work_queue_opts *opts)
1007 {
1008     opts->pend_lothresh = 0;
1009     opts->pend_hithresh = 0;
1010 }
1011
1012 /**
1013  * set the options for a struct afs_work_queue_opts appropriate for a certain
1014  * number of threads.
1015  *
1016  * @param[out] opts   opts struct in which to set the values
1017  * @param[in] threads number of threads
1018  */
1019 void
1020 afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
1021 {
1022     opts->pend_lothresh = threads * 2;
1023     opts->pend_hithresh = threads * 16;
1024
1025     /* safety */
1026     if (opts->pend_lothresh < 1) {
1027         opts->pend_lothresh = 1;
1028     }
1029     if (opts->pend_hithresh < 2) {
1030         opts->pend_hithresh = 2;
1031     }
1032 }
1033
1034 /**
1035  * allocate and initialize a work queue object.
1036  *
1037  * @param[out]   queue_out  address in which to store newly allocated work queue object
1038  * @param[in]    rock       work queue opaque pointer (passed as first arg to all fired callbacks)
1039  * @param[in]    opts       options for the new created queue
1040  *
1041  * @return operation status
1042  *    @retval 0 success
1043  *    @retval ENOMEM         out of memory
1044  */
1045 int
1046 afs_wq_create(struct afs_work_queue ** queue_out,
1047               void * rock,
1048               struct afs_work_queue_opts *opts)
1049 {
1050     int ret = 0;
1051     struct afs_work_queue * queue;
1052
1053     ret = _afs_wq_alloc(queue_out);
1054     if (ret) {
1055         goto error;
1056     }
1057     queue = *queue_out;
1058
1059     if (opts) {
1060         memcpy(&queue->opts, opts, sizeof(queue->opts));
1061     } else {
1062         afs_wq_opts_init(&queue->opts);
1063     }
1064
1065     _afs_wq_node_list_init(&queue->ready_list,
1066                                 AFS_WQ_NODE_LIST_READY);
1067     _afs_wq_node_list_init(&queue->blocked_list,
1068                                 AFS_WQ_NODE_LIST_BLOCKED);
1069     _afs_wq_node_list_init(&queue->done_list,
1070                                 AFS_WQ_NODE_LIST_DONE);
1071     queue->rock = rock;
1072     queue->drain = 0;
1073     queue->shutdown = 0;
1074     queue->pend_count = 0;
1075     queue->running_count = 0;
1076
1077     MUTEX_INIT(&queue->lock, "queue", MUTEX_DEFAULT, 0);
1078     CV_INIT(&queue->pend_cv, "queue pending", CV_DEFAULT, 0);
1079     CV_INIT(&queue->empty_cv, "queue empty", CV_DEFAULT, 0);
1080     CV_INIT(&queue->running_cv, "queue running", CV_DEFAULT, 0);
1081
1082  error:
1083     return ret;
1084 }
1085
1086 /**
1087  * deallocate and free a work queue object.
1088  *
1089  * @param[in] queue  work queue to be destroyed
1090  *
1091  * @return operation status
1092  *    @retval 0  success
1093  *    @retval AFS_WQ_ERROR unspecified error
1094  */
1095 int
1096 afs_wq_destroy(struct afs_work_queue * queue)
1097 {
1098     int ret = 0;
1099
1100     ret = _afs_wq_node_list_destroy(&queue->ready_list);
1101     if (ret) {
1102         goto error;
1103     }
1104
1105     ret = _afs_wq_node_list_destroy(&queue->blocked_list);
1106     if (ret) {
1107         goto error;
1108     }
1109
1110     ret = _afs_wq_node_list_destroy(&queue->done_list);
1111     if (ret) {
1112         goto error;
1113     }
1114
1115     ret = _afs_wq_free(queue);
1116
1117  error:
1118     return ret;
1119 }
1120
1121 /**
1122  * shutdown a work queue.
1123  *
1124  * @param[in] queue work queue object pointer
1125  *
1126  * @return operation status
1127  *    @retval 0 success
1128  */
1129 int
1130 afs_wq_shutdown(struct afs_work_queue * queue)
1131 {
1132     int ret = 0;
1133
1134     MUTEX_ENTER(&queue->lock);
1135     if (queue->shutdown) {
1136         /* already shutdown, do nothing */
1137         MUTEX_EXIT(&queue->lock);
1138         goto error;
1139     }
1140     queue->shutdown = 1;
1141
1142     ret = _afs_wq_node_list_shutdown(&queue->ready_list);
1143     if (ret) {
1144         goto error;
1145     }
1146
1147     ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
1148     if (ret) {
1149         goto error;
1150     }
1151
1152     ret = _afs_wq_node_list_shutdown(&queue->done_list);
1153     if (ret) {
1154         goto error;
1155     }
1156
1157     /* signal everyone that could be waiting, since these conditions will
1158      * generally fail to signal on their own if we're shutdown, since no
1159      * progress is being made */
1160     CV_BROADCAST(&queue->pend_cv);
1161     CV_BROADCAST(&queue->empty_cv);
1162     MUTEX_EXIT(&queue->lock);
1163
1164  error:
1165     return ret;
1166 }
1167
1168 /**
1169  * allocate a work node.
1170  *
1171  * @param[out] node_out  address in which to store new work node
1172  *
1173  * @return operation status
1174  *    @retval 0 success
1175  *    @retval ENOMEM         out of memory
1176  */
1177 int
1178 afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
1179 {
1180     int ret = 0;
1181     struct afs_work_queue_node * node;
1182
1183     *node_out = node = (struct afs_work_queue_node *) malloc(sizeof(*node));
1184     if (node == NULL) {
1185         ret = ENOMEM;
1186         goto error;
1187     }
1188
1189     queue_NodeInit(&node->node_list);
1190     node->qidx = AFS_WQ_NODE_LIST_NONE;
1191     node->cbf = NULL;
1192     node->rock = node->queue = NULL;
1193     node->refcount = 1;
1194     node->block_count = 0;
1195     node->error_count = 0;
1196     MUTEX_INIT(&node->lock, "node", MUTEX_DEFAULT, 0);
1197     CV_INIT(&node->state_cv, "node state", CV_DEFAULT, 0);
1198     node->state = AFS_WQ_NODE_STATE_INIT;
1199     queue_Init(&node->dep_children);
1200
1201  error:
1202     return ret;
1203 }
1204
1205 /**
1206  * free a work node.
1207  *
1208  * @param[in] node  work node object
1209  *
1210  * @return operation status
1211  *    @retval 0 success
1212  *
1213  * @internal
1214  */
1215 static int
1216 _afs_wq_node_free(struct afs_work_queue_node * node)
1217 {
1218     int ret = 0;
1219
1220     if (queue_IsOnQueue(node) ||
1221         (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
1222         (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
1223         (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
1224         ret = AFS_WQ_ERROR;
1225         goto error;
1226     }
1227
1228     ret = _afs_wq_node_free_deps(node);
1229     if (ret) {
1230         goto error;
1231     }
1232
1233     MUTEX_DESTROY(&node->lock);
1234     CV_DESTROY(&node->state_cv);
1235
1236     if (node->rock_dtor) {
1237         (*node->rock_dtor) (node->rock);
1238     }
1239
1240     free(node);
1241
1242  error:
1243     return ret;
1244 }
1245
1246 /**
1247  * get a reference to a work node.
1248  *
1249  * @param[in] node  work queue node
1250  *
1251  * @return operation status
1252  *    @retval 0 success
1253  */
1254 int
1255 afs_wq_node_get(struct afs_work_queue_node * node)
1256 {
1257     MUTEX_ENTER(&node->lock);
1258     node->refcount++;
1259     MUTEX_EXIT(&node->lock);
1260
1261     return 0;
1262 }
1263
1264 /**
1265  * put back a reference to a work node.
1266  *
1267  * @param[in] node  work queue node
1268  * @param[in] drop  drop node->lock
1269  *
1270  * @post if refcount reaches zero, node is deallocated.
1271  *
1272  * @return operation status
1273  *    @retval 0 success
1274  *
1275  * @pre node->lock held
1276  *
1277  * @internal
1278  */
1279 static int
1280 _afs_wq_node_put_r(struct afs_work_queue_node * node,
1281                    int drop)
1282 {
1283     afs_uint32 refc;
1284
1285     osi_Assert(node->refcount > 0);
1286     refc = --node->refcount;
1287     if (drop) {
1288         MUTEX_EXIT(&node->lock);
1289     }
1290     if (!refc) {
1291         osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
1292         _afs_wq_node_free(node);
1293     }
1294
1295     return 0;
1296 }
1297
1298 /**
1299  * put back a reference to a work node.
1300  *
1301  * @param[in] node  work queue node
1302  *
1303  * @post if refcount reaches zero, node is deallocated.
1304  *
1305  * @return operation status
1306  *    @retval 0 success
1307  */
1308 int
1309 afs_wq_node_put(struct afs_work_queue_node * node)
1310 {
1311     MUTEX_ENTER(&node->lock);
1312     return _afs_wq_node_put_r(node, 1);
1313 }
1314
1315 /**
1316  * set the callback function on a work node.
1317  *
1318  * @param[in] node  work queue node
1319  * @param[in] cbf   callback function
1320  * @param[in] rock  opaque pointer passed to callback
1321  * @param[in] rock_dtor  destructor function for 'rock', or NULL
1322  *
1323  * @return operation status
1324  *    @retval 0 success
1325  */
1326 int
1327 afs_wq_node_set_callback(struct afs_work_queue_node * node,
1328                          afs_wq_callback_func_t * cbf,
1329                          void * rock, afs_wq_callback_dtor_t *dtor)
1330 {
1331     MUTEX_ENTER(&node->lock);
1332     node->cbf = cbf;
1333     node->rock = rock;
1334     node->rock_dtor = dtor;
1335     MUTEX_EXIT(&node->lock);
1336
1337     return 0;
1338 }
1339
1340 /**
1341  * detach work node.
1342  *
1343  * @param[in] node  work queue node
1344  *
1345  * @return operation status
1346  *    @retval 0 success
1347  */
1348 int
1349 afs_wq_node_set_detached(struct afs_work_queue_node * node)
1350 {
1351     MUTEX_ENTER(&node->lock);
1352     node->detached = 1;
1353     MUTEX_EXIT(&node->lock);
1354
1355     return 0;
1356 }
1357
1358 /**
1359  * link a dependency node to a parent and child work node.
1360  *
1361  * This links a dependency node such that when the 'parent' work node is
1362  * done, the 'child' work node can proceed.
1363  *
1364  * @param[in]  dep      dependency node
1365  * @param[in]  parent   parent node in this dependency
1366  * @param[in]  child    child node in this dependency
1367  *
1368  * @return operation status
1369  *    @retval 0 success
1370  *
1371  * @pre
1372  *   - parent->lock held
1373  *   - child->lock held
1374  *   - parent and child in quiescent state
1375  *
1376  * @internal
1377  */
1378 static int
1379 _afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
1380                    struct afs_work_queue_node *parent,
1381                    struct afs_work_queue_node *child)
1382 {
1383     int ret = 0;
1384
1385     /* Each dep node adds a ref to the child node of that dep. We do not
1386      * do the same for the parent node, since if the only refs remaining
1387      * for a node are deps in node->dep_children, then the node should be
1388      * destroyed, and we will destroy the dep nodes when we free the
1389      * work node. */
1390     ret = _afs_wq_node_get_r(child);
1391     if (ret) {
1392         goto error;
1393     }
1394
1395     /* add this dep node to the parent node's list of deps */
1396     queue_Append(&parent->dep_children, &dep->parent_list);
1397
1398     dep->child = child;
1399     dep->parent = parent;
1400
1401  error:
1402     return ret;
1403 }
1404
1405 /**
1406  * add a dependency to a work node.
1407  *
1408  * @param[in] child  node which will be dependent upon completion of parent
1409  * @param[in] parent node whose completion gates child's execution
1410  *
1411  * @pre
1412  *   - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1413  *
1414  * @return operation status
1415  *    @retval 0 success
1416  */
1417 int
1418 afs_wq_node_dep_add(struct afs_work_queue_node * child,
1419                     struct afs_work_queue_node * parent)
1420 {
1421     int ret = 0;
1422     struct afs_work_queue_dep_node * dep = NULL;
1423     struct afs_work_queue_node_multilock ml;
1424     int held = 0;
1425
1426     /* self references are bad, mmkay? */
1427     if (parent == child) {
1428         ret = AFS_WQ_ERROR;
1429         goto error;
1430     }
1431
1432     ret = _afs_wq_dep_alloc(&dep);
1433     if (ret) {
1434         goto error;
1435     }
1436
1437     memset(&ml, 0, sizeof(ml));
1438     ml.nodes[0].node = parent;
1439     ml.nodes[1].node = child;
1440     ret = _afs_wq_node_multilock(&ml);
1441     if (ret) {
1442         goto error;
1443     }
1444     held = 1;
1445
1446     /* only allow dep modification while in initial state
1447      * or running state (e.g. do a dep add while inside callback) */
1448     if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1449         (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1450         ret = AFS_WQ_ERROR;
1451         goto error;
1452     }
1453
1454     /* link dep node with child and parent work queue node */
1455     ret = _afs_wq_dep_link_r(dep, parent, child);
1456     if (ret) {
1457         goto error;
1458     }
1459
1460     /* handle blocking counts */
1461     switch (parent->state) {
1462     case AFS_WQ_NODE_STATE_INIT:
1463     case AFS_WQ_NODE_STATE_SCHEDULED:
1464     case AFS_WQ_NODE_STATE_RUNNING:
1465     case AFS_WQ_NODE_STATE_BLOCKED:
1466         child->block_count++;
1467         break;
1468
1469     case AFS_WQ_NODE_STATE_ERROR:
1470         child->error_count++;
1471         break;
1472
1473     default:
1474         (void)0; /* nop */
1475     }
1476
1477  done:
1478     if (held) {
1479         MUTEX_EXIT(&child->lock);
1480         MUTEX_EXIT(&parent->lock);
1481     }
1482     return ret;
1483
1484  error:
1485     if (dep) {
1486         _afs_wq_dep_free(dep);
1487     }
1488     goto done;
1489 }
1490
1491 /**
1492  * remove a dependency from a work node.
1493  *
1494  * @param[in] child  node which was dependent upon completion of parent
1495  * @param[in] parent node whose completion gated child's execution
1496  *
1497  * @return operation status
1498  *    @retval 0 success
1499  */
1500 int
1501 afs_wq_node_dep_del(struct afs_work_queue_node * child,
1502                     struct afs_work_queue_node * parent)
1503 {
1504     int code, ret = 0;
1505     struct afs_work_queue_dep_node * dep, * ndep;
1506     struct afs_work_queue_node_multilock ml;
1507     int held = 0;
1508
1509     memset(&ml, 0, sizeof(ml));
1510     ml.nodes[0].node = parent;
1511     ml.nodes[1].node = child;
1512     code = _afs_wq_node_multilock(&ml);
1513     if (code) {
1514         goto error;
1515     }
1516     held = 1;
1517
1518     /* only permit changes while child is in init state
1519      * or running state (e.g. do a dep del when in callback func) */
1520     if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1521         (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1522         ret = AFS_WQ_ERROR;
1523         goto error;
1524     }
1525
1526     /* locate node linking parent and child */
1527     for (queue_Scan(&parent->dep_children,
1528                     dep,
1529                     ndep,
1530                     afs_work_queue_dep_node)) {
1531         if ((dep->child == child) &&
1532             (dep->parent == parent)) {
1533
1534             /* no need to grab an extra ref on dep->child here; the caller
1535              * should already have a ref on dep->child */
1536             code = _afs_wq_dep_unlink_r(dep);
1537             if (code) {
1538                 ret = code;
1539                 goto error;
1540             }
1541
1542             code = _afs_wq_dep_free(dep);
1543             if (code) {
1544                 ret = code;
1545                 goto error;
1546             }
1547             break;
1548         }
1549     }
1550
1551  error:
1552     if (held) {
1553         MUTEX_EXIT(&child->lock);
1554         MUTEX_EXIT(&parent->lock);
1555     }
1556     return ret;
1557 }
1558
1559 /**
1560  * block a work node from execution.
1561  *
1562  * this can be used to allow external events to influence work queue flow.
1563  *
1564  * @param[in] node  work queue node to be blocked
1565  *
1566  * @return operation status
1567  *    @retval 0 success
1568  *
1569  * @post external block count incremented
1570  */
1571 int
1572 afs_wq_node_block(struct afs_work_queue_node * node)
1573 {
1574     int ret = 0;
1575     int start;
1576
1577     MUTEX_ENTER(&node->lock);
1578     ret = _afs_wq_node_state_wait_busy(node);
1579     if (ret) {
1580         goto error_sync;
1581     }
1582
1583     start = node->block_count++;
1584
1585     if (!start &&
1586         (node->qidx == AFS_WQ_NODE_LIST_READY)) {
1587         /* unblocked->blocked transition, and we're already scheduled */
1588         ret = _afs_wq_node_list_remove(node,
1589                                             AFS_WQ_NODE_STATE_BUSY);
1590         if (ret) {
1591             goto error_sync;
1592         }
1593
1594         ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
1595                                              node,
1596                                              AFS_WQ_NODE_STATE_BLOCKED);
1597     }
1598
1599  error_sync:
1600     MUTEX_EXIT(&node->lock);
1601
1602     return ret;
1603 }
1604
1605 /**
1606  * unblock a work node for execution.
1607  *
1608  * this can be used to allow external events to influence work queue flow.
1609  *
1610  * @param[in] node  work queue node to be blocked
1611  *
1612  * @return operation status
1613  *    @retval 0 success
1614  *
1615  * @post external block count decremented
1616  */
1617 int
1618 afs_wq_node_unblock(struct afs_work_queue_node * node)
1619 {
1620     int ret = 0;
1621     int end;
1622
1623     MUTEX_ENTER(&node->lock);
1624     ret = _afs_wq_node_state_wait_busy(node);
1625     if (ret) {
1626         goto error_sync;
1627     }
1628
1629     end = --node->block_count;
1630
1631     if (!end &&
1632         (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
1633         /* blocked->unblock transition, and we're ready to be scheduled */
1634         ret = _afs_wq_node_list_remove(node,
1635                                             AFS_WQ_NODE_STATE_BUSY);
1636         if (ret) {
1637             goto error_sync;
1638         }
1639
1640         ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
1641                                              node,
1642                                              AFS_WQ_NODE_STATE_SCHEDULED);
1643     }
1644
1645  error_sync:
1646     MUTEX_EXIT(&node->lock);
1647
1648     return ret;
1649 }
1650
1651 /**
1652  * initialize a afs_wq_add_opts struct with the default options.
1653  *
1654  * @param[out] opts  options structure to initialize
1655  */
1656 void
1657 afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
1658 {
1659     opts->donate = 0;
1660     opts->block = 1;
1661     opts->force = 0;
1662 }
1663
1664 /**
1665  * schedule a work node for execution.
1666  *
1667  * @param[in] queue  work queue
1668  * @param[in] node   work node
1669  * @param[in] opts   options for adding, or NULL for defaults
1670  *
1671  * @return operation status
1672  *    @retval 0 success
1673  *    @retval EWOULDBLOCK queue is full and opts specified not to block
1674  *    @retval EINTR queue was full, we blocked to add, and the queue was
1675  *                  shutdown while we were blocking
1676  */
1677 int
1678 afs_wq_add(struct afs_work_queue *queue,
1679            struct afs_work_queue_node *node,
1680            struct afs_work_queue_add_opts *opts)
1681 {
1682     int ret = 0;
1683     int donate, block, force, hithresh;
1684     struct afs_work_queue_node_list * list;
1685     struct afs_work_queue_add_opts l_opts;
1686     int waited_for_drain = 0;
1687     afs_wq_work_state_t state;
1688
1689     if (!opts) {
1690         afs_wq_add_opts_init(&l_opts);
1691         opts = &l_opts;
1692     }
1693
1694     donate = opts->donate;
1695     block = opts->block;
1696     force = opts->force;
1697
1698  retry:
1699     MUTEX_ENTER(&node->lock);
1700
1701     ret = _afs_wq_node_state_wait_busy(node);
1702     if (ret) {
1703         goto error;
1704     }
1705
1706     if (!node->block_count && !node->error_count) {
1707         list = &queue->ready_list;
1708         state = AFS_WQ_NODE_STATE_SCHEDULED;
1709     } else if (node->error_count) {
1710         list = &queue->done_list;
1711         state = AFS_WQ_NODE_STATE_ERROR;
1712     } else {
1713         list = &queue->blocked_list;
1714         state = AFS_WQ_NODE_STATE_BLOCKED;
1715     }
1716
1717     ret = 0;
1718
1719     MUTEX_ENTER(&queue->lock);
1720
1721     if (queue->shutdown) {
1722         ret = EINTR;
1723         MUTEX_EXIT(&queue->lock);
1724         MUTEX_EXIT(&node->lock);
1725         goto error;
1726     }
1727
1728     hithresh = queue->opts.pend_hithresh;
1729     if (hithresh > 0 && queue->pend_count >= hithresh) {
1730         queue->drain = 1;
1731     }
1732
1733     if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
1734                    || state == AFS_WQ_NODE_STATE_BLOCKED)) {
1735
1736         if (queue->drain) {
1737             if (block) {
1738                 MUTEX_EXIT(&node->lock);
1739                 CV_WAIT(&queue->pend_cv, &queue->lock);
1740
1741                 if (queue->shutdown) {
1742                     ret = EINTR;
1743                 } else {
1744                     MUTEX_EXIT(&queue->lock);
1745
1746                     waited_for_drain = 1;
1747
1748                     goto retry;
1749                 }
1750             } else {
1751                 ret = EWOULDBLOCK;
1752             }
1753         }
1754     }
1755
1756     if (ret == 0) {
1757         queue->pend_count++;
1758     }
1759     if (waited_for_drain) {
1760         /* signal another thread that may have been waiting for drain */
1761         CV_SIGNAL(&queue->pend_cv);
1762     }
1763
1764     MUTEX_EXIT(&queue->lock);
1765
1766     if (ret) {
1767         goto error;
1768     }
1769
1770     if (!donate)
1771         node->refcount++;
1772     node->queue = queue;
1773
1774     ret = _afs_wq_node_list_enqueue(list,
1775                                          node,
1776                                          state);
1777  error:
1778     return ret;
1779 }
1780
1781 /**
1782  * de-schedule a work node.
1783  *
1784  * @param[in] node  work node
1785  *
1786  * @return operation status
1787  *    @retval 0 success
1788  */
1789 int
1790 afs_wq_del(struct afs_work_queue_node * node)
1791 {
1792     /* XXX todo */
1793     return ENOTSUP;
1794 }
1795
1796 /**
1797  * execute a node on the queue.
1798  *
1799  * @param[in] queue  work queue
1800  * @param[in] rock   opaque pointer (passed as third arg to callback func)
1801  *
1802  * @return operation status
1803  *    @retval 0 completed a work unit
1804  */
1805 int
1806 afs_wq_do(struct afs_work_queue * queue,
1807           void * rock)
1808 {
1809     return _afs_wq_do(queue, rock, 1);
1810 }
1811
1812 /**
1813  * execute a node on the queue, if there is any work to do.
1814  *
1815  * @param[in] queue  work queue
1816  * @param[in] rock   opaque pointer (passed as third arg to callback func)
1817  *
1818  * @return operation status
1819  *    @retval 0 completed a work unit
1820  *    @retval EWOULDBLOCK there was nothing to do
1821  */
1822 int
1823 afs_wq_do_nowait(struct afs_work_queue * queue,
1824                  void * rock)
1825 {
1826     return _afs_wq_do(queue, rock, 0);
1827 }
1828
1829 /**
1830  * wait for all pending nodes to finish.
1831  *
1832  * @param[in] queue  work queue
1833  *
1834  * @return operation status
1835  *   @retval 0 success
1836  *
1837  * @post the specified queue was empty at some point; it may not be empty by
1838  * the time this function returns, but at some point after the function was
1839  * called, there were no nodes in the ready queue or blocked queue.
1840  */
1841 int
1842 afs_wq_wait_all(struct afs_work_queue *queue)
1843 {
1844     int ret = 0;
1845
1846     MUTEX_ENTER(&queue->lock);
1847
1848     while (queue->pend_count > 0 && !queue->shutdown) {
1849         CV_WAIT(&queue->empty_cv, &queue->lock);
1850     }
1851
1852     if (queue->shutdown) {
1853         /* queue has been shut down, but there may still be some threads
1854          * running e.g. in the middle of their callback. ensure they have
1855          * stopped before we return. */
1856         while (queue->running_count > 0) {
1857             CV_WAIT(&queue->running_cv, &queue->lock);
1858         }
1859         ret = EINTR;
1860         goto done;
1861     }
1862
1863  done:
1864     MUTEX_EXIT(&queue->lock);
1865
1866     /* technically this doesn't really guarantee that the work queue is empty
1867      * after we return, but we do guarantee that it was empty at some point */
1868
1869     return ret;
1870 }
1871
1872 /**
1873  * wait for a node to complete; dequeue from done list.
1874  *
1875  * @param[in]  node     work queue node
1876  * @param[out] retcode  return code from work unit
1877  *
1878  * @return operation status
1879  *    @retval 0 sucess
1880  *
1881  * @pre ref held on node
1882  */
1883 int
1884 afs_wq_node_wait(struct afs_work_queue_node * node,
1885                  int * retcode)
1886 {
1887     int ret = 0;
1888
1889     MUTEX_ENTER(&node->lock);
1890     if (node->state == AFS_WQ_NODE_STATE_INIT) {
1891         /* not sure what to do in this case */
1892         goto done_sync;
1893     }
1894
1895     while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
1896            (node->state != AFS_WQ_NODE_STATE_ERROR)) {
1897         CV_WAIT(&node->state_cv, &node->lock);
1898     }
1899     if (retcode) {
1900         *retcode = node->retcode;
1901     }
1902
1903     if (node->queue == NULL) {
1904         /* nothing we can do */
1905         goto done_sync;
1906     }
1907
1908     ret = _afs_wq_node_list_remove(node,
1909                                         AFS_WQ_NODE_STATE_INIT);
1910
1911  done_sync:
1912     MUTEX_EXIT(&node->lock);
1913
1914     return ret;
1915 }