util: Tidy header includes
[openafs.git] / src / util / work_queue.c
1 /*
2  * Copyright 2008-2010, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <sys/file.h>
15
16 #include <afs/afs_assert.h>
17 #include <lock.h>
18
19 #define __AFS_WORK_QUEUE_IMPL 1
20 #include "work_queue.h"
21 #include "work_queue_impl.h"
22
23 /**
24  * public interfaces for work_queue.
25  */
26
27 static int
28 _afs_wq_node_put_r(struct afs_work_queue_node * node,
29                    int drop);
30
31 /**
32  * allocate a work queue object.
33  *
34  * @param[out] queue_out address in which to store queue pointer
35  *
36  * @return operation status
37  *    @retval 0 success
38  *    @retval ENOMEM out of memory
39  *
40  * @internal
41  */
42 static int
43 _afs_wq_alloc(struct afs_work_queue ** queue_out)
44 {
45     int ret = 0;
46     struct afs_work_queue * queue;
47
48     *queue_out = queue = malloc(sizeof(*queue));
49     if (queue == NULL) {
50         ret = ENOMEM;
51         goto error;
52     }
53
54  error:
55     return ret;
56 }
57
58 /**
59  * free a work queue object.
60  *
61  * @param[in] queue  work queue object
62  *
63  * @return operation status
64  *    @retval 0 success
65  *
66  * @internal
67  */
68 static int
69 _afs_wq_free(struct afs_work_queue * queue)
70 {
71     int ret = 0;
72
73     free(queue);
74
75     return ret;
76 }
77
78 /**
79  * change a node's state.
80  *
81  * @param[in] node       node object
82  * @param[in] new_state  new object state
83  *
84  * @return old state
85  *
86  * @pre node->lock held
87  *
88  * @internal
89  */
90 static afs_wq_work_state_t
91 _afs_wq_node_state_change(struct afs_work_queue_node * node,
92                           afs_wq_work_state_t new_state)
93 {
94     afs_wq_work_state_t old_state;
95
96     old_state = node->state;
97     node->state = new_state;
98
99     CV_BROADCAST(&node->state_cv);
100
101     return old_state;
102 }
103
104 /**
105  * wait for a node's state to change from busy to something else.
106  *
107  * @param[in] node  node object
108  *
109  * @return operation status
110  *    @retval 0 success
111  *
112  * @pre node->lock held
113  *
114  * @internal
115  */
116 static int
117 _afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
118 {
119     while (node->state == AFS_WQ_NODE_STATE_BUSY) {
120         CV_WAIT(&node->state_cv, &node->lock);
121     }
122
123     return 0;
124 }
125
126 /**
127  * check whether a work queue node is busy.
128  *
129  * @param[in] node  node object pointer
130  *
131  * @return whether node is busy
132  *    @retval 1 node is busy
133  *    @retval 0 node is not busy
134  *
135  * @pre node->lock held
136  *
137  * @internal
138  */
139 static int
140 _afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
141 {
142     return (node->state == AFS_WQ_NODE_STATE_BUSY);
143 }
144
145 /**
146  * attempt to simultaneously lock two work queue nodes.
147  *
148  * this is a somewhat tricky algorithm because there is no
149  * defined hierarchy within the work queue node population.
150  *
151  * @param[in] ml  multilock control structure
152  *
153  * @return operation status
154  *    @retval 0
155  *
156  * @note in theory, we could easily extend this to
157  *       lock more than two nodes
158  *
159  * @pre
160  *   - caller MUST NOT have set busy state on either node
161  *
162  * @post
163  *   - locks held on both nodes
164  *   - both nodes in quiescent states
165  *
166  * @note node with non-zero lock_held or busy_held fields
167  *       MUST go in array index 0
168  *
169  * @internal
170  */
171 static int
172 _afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
173 {
174     int code, ret = 0;
175     struct timespec delay;
176     int first = 1, second = 0, tmp;
177
178     /* first pass processing */
179     if (ml->nodes[0].lock_held) {
180         if (!ml->nodes[0].busy_held) {
181             ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
182             if (ret) {
183                 goto error;
184             }
185         }
186
187         code = MUTEX_TRYENTER(&ml->nodes[1].node->lock);
188         if (code) {
189             /* success */
190             goto done;
191         }
192
193         /* setup for main loop */
194         MUTEX_EXIT(&ml->nodes[0].node->lock);
195     }
196
197     /*
198      * setup random exponential backoff
199      *
200      * set initial delay to random value in the range [500,1000) ns
201      */
202     delay.tv_sec = 0;
203     delay.tv_nsec = 500 + rand() % 500;
204
205     while (1) {
206         MUTEX_ENTER(&ml->nodes[first].node->lock);
207         if ((first != 0) || !ml->nodes[0].busy_held) {
208             ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
209             if (ret) {
210                 /* cleanup */
211                 if (!ml->nodes[0].lock_held || first) {
212                     MUTEX_EXIT(&ml->nodes[first].node->lock);
213                     if (ml->nodes[0].lock_held) {
214                         /* on error, return with locks in same state as before call */
215                         MUTEX_ENTER(&ml->nodes[0].node->lock);
216                     }
217                 }
218                 goto error;
219             }
220         }
221
222         /*
223          * in order to avoid deadlock, we must use trylock and
224          * a non-blocking state check.  if we meet any contention,
225          * we must drop back and start again.
226          */
227         code = MUTEX_TRYENTER(&ml->nodes[second].node->lock);
228         if (code) {
229             if (((second == 0) && (ml->nodes[0].busy_held)) ||
230                 !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
231                 /* success */
232                 break;
233             } else {
234                 MUTEX_EXIT(&ml->nodes[second].node->lock);
235             }
236         }
237
238         /*
239          * contended.
240          *
241          * drop locks, use exponential backoff,
242          * try acquiring in the opposite order
243          */
244         MUTEX_EXIT(&ml->nodes[first].node->lock);
245         nanosleep(&delay, NULL);
246         if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
247             delay.tv_nsec <<= 1;
248         }
249         tmp = second;
250         second = first;
251         first = tmp;
252     }
253
254  done:
255  error:
256     return ret;
257 }
258
259 /**
260  * initialize a node list object.
261  *
262  * @param[in] list list object
263  * @param[in] id   list identifier
264  *
265  * @return operation status
266  *    @retval 0 success
267  *
268  * @internal
269  */
270 static int
271 _afs_wq_node_list_init(struct afs_work_queue_node_list * list,
272                        afs_wq_node_list_id_t id)
273 {
274     queue_Init(&list->list);
275     MUTEX_INIT(&list->lock, "list", MUTEX_DEFAULT, 0);
276     CV_INIT(&list->cv, "list", CV_DEFAULT, 0);
277     list->qidx = id;
278     list->shutdown = 0;
279
280     return 0;
281 }
282
283 /**
284  * destroy a node list object.
285  *
286  * @param[in] list list object
287  *
288  * @return operation status
289  *    @retval 0 success
290  *    @retval AFS_WQ_ERROR list not empty
291  *
292  * @internal
293  */
294 static int
295 _afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
296 {
297     int ret = 0;
298
299     if (queue_IsNotEmpty(&list->list)) {
300         ret = AFS_WQ_ERROR;
301         goto error;
302     }
303
304     MUTEX_DESTROY(&list->lock);
305     CV_DESTROY(&list->cv);
306
307  error:
308     return ret;
309 }
310
311 /**
312  * wakeup all threads waiting in dequeue.
313  *
314  * @param[in] list list object
315  *
316  * @return operation status
317  *    @retval 0 success
318  *
319  * @internal
320  */
321 static int
322 _afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
323 {
324     int ret = 0;
325     struct afs_work_queue_node *node, *nnode;
326
327     MUTEX_ENTER(&list->lock);
328     list->shutdown = 1;
329
330     for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
331         _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
332         queue_Remove(node);
333         node->qidx = AFS_WQ_NODE_LIST_NONE;
334         node->queue = NULL;
335
336         if (node->detached) {
337             /* if we are detached, we hold the reference on the node;
338              * otherwise, it is some other caller that holds the reference.
339              * So don't put the node if we are not detached; the node will
340              * get freed when someone else calls afs_wq_node_put */
341             afs_wq_node_put(node);
342         }
343     }
344
345     CV_BROADCAST(&list->cv);
346     MUTEX_EXIT(&list->lock);
347
348     return ret;
349 }
350
351 /**
352  * append to a node list object.
353  *
354  * @param[in] list  list object
355  * @param[in] node  node object
356  * @param[in] state new node state
357  *
358  * @return operation status
359  *    @retval 0 success
360  *    @retval AFS_WQ_ERROR raced to enqueue node
361  *
362  * @pre
363  *   - node lock held
364  *   - node is not on a list
365  *   - node is either not busy, or it is marked as busy by the calling thread
366  *
367  * @post
368  *   - enqueued on list
369  *   - node lock dropped
370  *
371  * @internal
372  */
373 static int
374 _afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
375                           struct afs_work_queue_node * node,
376                           afs_wq_work_state_t state)
377 {
378     int code, ret = 0;
379
380     if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
381         /* raced */
382         ret = AFS_WQ_ERROR;
383         goto error;
384     }
385
386     /* deal with lock inversion */
387     code = MUTEX_TRYENTER(&list->lock);
388     if (!code) {
389         /* contended */
390         _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
391         MUTEX_EXIT(&node->lock);
392         MUTEX_ENTER(&list->lock);
393         MUTEX_ENTER(&node->lock);
394
395         /* assert state of the world (we set busy, so this should never happen) */
396         osi_Assert(queue_IsNotOnQueue(node));
397     }
398
399     if (list->shutdown) {
400         ret = AFS_WQ_ERROR;
401         goto error_unlock;
402     }
403
404     osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
405     if (queue_IsEmpty(&list->list)) {
406         /* wakeup a dequeue thread */
407         CV_SIGNAL(&list->cv);
408     }
409     queue_Append(&list->list, node);
410     node->qidx = list->qidx;
411     _afs_wq_node_state_change(node, state);
412
413  error_unlock:
414     MUTEX_EXIT(&node->lock);
415     MUTEX_EXIT(&list->lock);
416
417  error:
418     return ret;
419 }
420
421 /**
422  * dequeue a node from a list object.
423  *
424  * @param[in]    list      list object
425  * @param[out]   node_out  address in which to store node object pointer
426  * @param[in]    state     new node state
427  * @param[in]    block     permit blocking on cv if asserted
428  *
429  * @return operation status
430  *    @retval 0 success
431  *    @retval EWOULDBLOCK block not asserted and nothing to dequeue
432  *    @retval EINTR blocking wait interrupted by list shutdown
433  *
434  * @post node object returned with node lock held and new state set
435  *
436  * @internal
437  */
438 static int
439 _afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
440                           struct afs_work_queue_node ** node_out,
441                           afs_wq_work_state_t state,
442                           int block)
443 {
444     int ret = 0;
445     struct afs_work_queue_node * node;
446
447     MUTEX_ENTER(&list->lock);
448
449     if (list->shutdown) {
450         *node_out = NULL;
451         ret = EINTR;
452         goto done_sync;
453     }
454
455     if (!block && queue_IsEmpty(&list->list)) {
456         *node_out = NULL;
457         ret = EWOULDBLOCK;
458         goto done_sync;
459     }
460
461     while (queue_IsEmpty(&list->list)) {
462         if (list->shutdown) {
463             *node_out = NULL;
464             ret = EINTR;
465             goto done_sync;
466         }
467         CV_WAIT(&list->cv, &list->lock);
468     }
469
470     *node_out = node = queue_First(&list->list, afs_work_queue_node);
471
472     MUTEX_ENTER(&node->lock);
473     queue_Remove(node);
474     node->qidx = AFS_WQ_NODE_LIST_NONE;
475     _afs_wq_node_state_change(node, state);
476
477  done_sync:
478     MUTEX_EXIT(&list->lock);
479
480     return ret;
481 }
482
483 /**
484  * remove a node from a list.
485  *
486  * @param[in] node        node object
487  * @param[in] next_state  node state following successful dequeue
488  *
489  * @return operation status
490  *    @retval 0 success
491  *    @retval AFS_WQ_ERROR in any of the following conditions:
492  *              - node not associated with a work queue
493  *              - node was not on a linked list (e.g. RUNNING state)
494  *              - we raced another thread
495  *
496  * @pre node->lock held
497  *
498  * @post node removed from node list
499  *
500  * @note node->lock may be dropped internally
501  *
502  * @internal
503  */
504 static int
505 _afs_wq_node_list_remove(struct afs_work_queue_node * node,
506                          afs_wq_work_state_t next_state)
507 {
508     int code, ret = 0;
509     struct afs_work_queue_node_list * list = NULL;
510
511     _afs_wq_node_state_wait_busy(node);
512
513     if (!node->queue) {
514         ret = AFS_WQ_ERROR;
515         goto error;
516     }
517     switch (node->qidx) {
518     case AFS_WQ_NODE_LIST_READY:
519         list = &node->queue->ready_list;
520         break;
521
522     case AFS_WQ_NODE_LIST_BLOCKED:
523         list = &node->queue->blocked_list;
524         break;
525
526     case AFS_WQ_NODE_LIST_DONE:
527         list = &node->queue->done_list;
528         break;
529
530     default:
531         ret = AFS_WQ_ERROR;
532     }
533
534     if (list) {
535         code = MUTEX_TRYENTER(&list->lock);
536         if (!code) {
537             /* contended */
538             _afs_wq_node_state_change(node,
539                                            AFS_WQ_NODE_STATE_BUSY);
540             MUTEX_EXIT(&node->lock);
541             MUTEX_ENTER(&list->lock);
542             MUTEX_ENTER(&node->lock);
543
544             if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
545                 /* raced */
546                 ret= AFS_WQ_ERROR;
547                 goto done_sync;
548             }
549         }
550
551         queue_Remove(node);
552         node->qidx = AFS_WQ_NODE_LIST_NONE;
553         _afs_wq_node_state_change(node, next_state);
554
555     done_sync:
556         MUTEX_EXIT(&list->lock);
557     }
558
559  error:
560     return ret;
561 }
562
563 /**
564  * allocate a dependency node.
565  *
566  * @param[out] node_out address in which to store dep node pointer
567  *
568  * @return operation status
569  *    @retval 0 success
570  *    @retval ENOMEM   out of memory
571  *
572  * @internal
573  */
574 static int
575 _afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
576 {
577     int ret = 0;
578     struct afs_work_queue_dep_node * node;
579
580     node = malloc(sizeof(*node));
581     if (node == NULL) {
582         ret = ENOMEM;
583         goto error;
584     }
585
586     queue_NodeInit(&node->parent_list);
587     node->parent = node->child = NULL;
588
589     *node_out = node;
590
591  error:
592     return ret;
593 }
594
595 /**
596  * free a dependency node.
597  *
598  * @param[in] node dep node pointer
599  *
600  * @return operation status
601  *    @retval 0 success
602  *    @retval AFS_WQ_ERROR still attached to a work node
603  *
604  * @internal
605  */
606 static int
607 _afs_wq_dep_free(struct afs_work_queue_dep_node * node)
608 {
609     int ret = 0;
610
611     if (queue_IsOnQueue(&node->parent_list) ||
612         node->parent ||
613         node->child) {
614         ret = AFS_WQ_ERROR;
615         goto error;
616     }
617
618     free(node);
619
620  error:
621     return ret;
622 }
623
624 /**
625  * unlink work nodes from a dependency node.
626  *
627  * @param[in]  dep      dependency node
628  *
629  * @return operation status
630  *    @retval 0 success
631  *
632  * @pre
633  *   - dep->parent and dep->child are either locked, or are not referenced
634  *     by anything else
635  *   - caller holds ref on dep->child
636  *   - dep->child and dep->parent in quiescent state
637  *
638  * @internal
639  */
640 static int
641 _afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
642 {
643     struct afs_work_queue_node *child = dep->child;
644     queue_Remove(&dep->parent_list);
645     dep->child = NULL;
646     dep->parent = NULL;
647
648     return _afs_wq_node_put_r(child, 0);
649 }
650
651 /**
652  * get a reference to a work node.
653  *
654  * @param[in] node  work queue node
655  *
656  * @return operation status
657  *    @retval 0 success
658  *
659  * @pre node->lock held
660  *
661  * @internal
662  */
663 static int
664 _afs_wq_node_get_r(struct afs_work_queue_node * node)
665 {
666     node->refcount++;
667
668     return 0;
669 }
670
671 /**
672  * unlink and free all of the dependency nodes from a node.
673  *
674  * @param[in] parent  work node that is the parent node of all deps to be freed
675  *
676  * @return operation status
677  *  @retval 0 success
678  *
679  * @pre parent->refcount == 0
680  */
681 static int
682 _afs_wq_node_free_deps(struct afs_work_queue_node *parent)
683 {
684     int ret = 0, code;
685     struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
686     struct afs_work_queue_dep_node * dep, * nd;
687
688     /* unlink and free all of the dep structs attached to 'parent' */
689     for (queue_Scan(&parent->dep_children,
690                     dep,
691                     nd,
692                     afs_work_queue_dep_node)) {
693
694         MUTEX_ENTER(&dep->child->lock);
695         node_unlock = dep->child;
696
697         /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
698          * put the last ref on the child, and we need the child to still exist
699          * so we can unlock it */
700         code = _afs_wq_node_get_r(dep->child);
701         if (code) {
702             goto loop_error;
703         }
704         node_put = dep->child;
705
706         /* remember, no need to lock dep->parent, since its refcount is 0 */
707         code = _afs_wq_dep_unlink_r(dep);
708
709      loop_error:
710         if (node_put) {
711             _afs_wq_node_put_r(node_put, 1);
712         } else if (node_unlock) {
713             MUTEX_EXIT(&node_unlock->lock);
714         }
715         node_put = node_unlock = NULL;
716
717         if (code == 0) {
718             /* Only do this if everything is okay; if code is nonzero,
719              * something will still be pointing at dep, so don't free it.
720              * We will leak memory, but that's better than memory corruption;
721              * we've done all we can do to try and free the dep memory */
722             code = _afs_wq_dep_free(dep);
723         }
724
725         if (!ret) {
726             ret = code;
727         }
728     }
729     return ret;
730 }
731
732 /**
733  * propagate state down through dep nodes.
734  *
735  * @param[in] parent      parent node object
736  * @param[in] next_state  next state parent will assume
737  *
738  * @return operation status
739  *    @retval 0 success
740  *
741  * @pre
742  *   - parent->lock held
743  *
744  * @internal
745  */
746 static int
747 _afs_wq_dep_propagate(struct afs_work_queue_node * parent,
748                       afs_wq_work_state_t next_state)
749 {
750     int ret = 0;
751     struct afs_work_queue_dep_node * dep, * nd;
752     struct afs_work_queue_node_multilock ml;
753     afs_wq_work_state_t old_state;
754     afs_wq_node_list_id_t qidx;
755     struct afs_work_queue_node_list * ql;
756     afs_wq_work_state_t cns;
757
758     old_state = _afs_wq_node_state_change(parent,
759                                                AFS_WQ_NODE_STATE_BUSY);
760     ml.nodes[0].node = parent;
761     ml.nodes[0].lock_held = 1;
762     ml.nodes[0].busy_held = 1;
763
764     /* scan through our children updating scheduling state */
765     for (queue_Scan(&parent->dep_children,
766                     dep,
767                     nd,
768                     afs_work_queue_dep_node)) {
769         /* skip half-registered nodes */
770         if (dep->child == NULL) {
771             continue;
772         }
773
774         ml.nodes[1].node = dep->child;
775         ml.nodes[1].lock_held = 0;
776         ml.nodes[1].busy_held = 0;
777         ret = _afs_wq_node_multilock(&ml);
778         if (ret) {
779             goto error;
780         }
781
782         switch (next_state) {
783         case AFS_WQ_NODE_STATE_DONE:
784             dep->child->block_count--;
785             break;
786
787         case AFS_WQ_NODE_STATE_ERROR:
788             dep->child->error_count++;
789             break;
790
791         default:
792             (void)0; /* nop */
793         }
794
795         /* skip unscheduled nodes */
796         if (dep->child->queue == NULL) {
797             MUTEX_EXIT(&dep->child->lock);
798             continue;
799         }
800
801         /*
802          * when blocked dep and error'd dep counts reach zero, the
803          * node can be scheduled for execution
804          */
805         if (dep->child->error_count) {
806             ql = &dep->child->queue->done_list;
807             qidx = AFS_WQ_NODE_LIST_DONE;
808             cns = AFS_WQ_NODE_STATE_ERROR;
809         } else if (dep->child->block_count) {
810             ql = &dep->child->queue->blocked_list;
811             qidx = AFS_WQ_NODE_LIST_BLOCKED;
812             cns = AFS_WQ_NODE_STATE_BLOCKED;
813         } else {
814             ql = &dep->child->queue->ready_list;
815             qidx = AFS_WQ_NODE_LIST_READY;
816             cns = AFS_WQ_NODE_STATE_SCHEDULED;
817         }
818
819         if (qidx != dep->child->qidx) {
820             /* we're transitioning to a different queue */
821             ret = _afs_wq_node_list_remove(dep->child,
822                                                 AFS_WQ_NODE_STATE_BUSY);
823             if (ret) {
824                 MUTEX_EXIT(&dep->child->lock);
825                 goto error;
826             }
827
828             ret = _afs_wq_node_list_enqueue(ql,
829                                                  dep->child,
830                                                  cns);
831             if (ret) {
832                 MUTEX_EXIT(&dep->child->lock);
833                 goto error;
834             }
835         }
836         MUTEX_EXIT(&dep->child->lock);
837     }
838
839  error:
840     _afs_wq_node_state_change(parent,
841                                    old_state);
842     return ret;
843 }
844
845 /**
846  * decrements queue->running_count, and signals waiters if appropriate.
847  *
848  * @param[in] queue  queue to dec the running count of
849  */
850 static void
851 _afs_wq_dec_running_count(struct afs_work_queue *queue)
852 {
853     MUTEX_ENTER(&queue->lock);
854     queue->running_count--;
855     if (queue->shutdown && queue->running_count == 0) {
856         /* if we've shut down, someone may be waiting for the running count
857          * to drop to 0 */
858         CV_BROADCAST(&queue->running_cv);
859     }
860     MUTEX_EXIT(&queue->lock);
861 }
862
863 /**
864  * execute a node on the queue.
865  *
866  * @param[in] queue  work queue
867  * @param[in] rock   opaque pointer (passed as third arg to callback func)
868  * @param[in] block  allow blocking in dequeue
869  *
870  * @return operation status
871  *    @retval 0 completed a work unit
872  *
873  * @internal
874  */
875 static int
876 _afs_wq_do(struct afs_work_queue * queue,
877            void * rock,
878            int block)
879 {
880     int code, ret = 0;
881     struct afs_work_queue_node * node;
882     afs_wq_callback_func_t * cbf;
883     afs_wq_work_state_t next_state;
884     struct afs_work_queue_node_list * ql;
885     void * node_rock;
886     int detached = 0;
887
888     /* We can inc queue->running_count before actually pulling the node off
889      * of the ready_list, since running_count only really matters when we are
890      * shut down. If we get shut down before we pull the node off of
891      * ready_list, but after we inc'd running_count,
892      * _afs_wq_node_list_dequeue should return immediately with EINTR,
893      * in which case we'll dec running_count, so it's as if we never inc'd it
894      * in the first place. */
895     MUTEX_ENTER(&queue->lock);
896     if (queue->shutdown) {
897         MUTEX_EXIT(&queue->lock);
898         return EINTR;
899     }
900     queue->running_count++;
901     MUTEX_EXIT(&queue->lock);
902
903     ret = _afs_wq_node_list_dequeue(&queue->ready_list,
904                                          &node,
905                                          AFS_WQ_NODE_STATE_RUNNING,
906                                          block);
907     if (ret) {
908         _afs_wq_dec_running_count(queue);
909         goto error;
910     }
911
912     cbf = node->cbf;
913     node_rock = node->rock;
914     detached = node->detached;
915
916     if (cbf != NULL) {
917         MUTEX_EXIT(&node->lock);
918         code = (*cbf)(queue, node, queue->rock, node_rock, rock);
919         MUTEX_ENTER(&node->lock);
920         if (code == 0) {
921             next_state = AFS_WQ_NODE_STATE_DONE;
922             ql = &queue->done_list;
923         } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
924             if (node->error_count) {
925                 next_state = AFS_WQ_NODE_STATE_ERROR;
926                 ql = &queue->done_list;
927             } else if (node->block_count) {
928                 next_state = AFS_WQ_NODE_STATE_BLOCKED;
929                 ql = &queue->blocked_list;
930             } else {
931                 next_state = AFS_WQ_NODE_STATE_SCHEDULED;
932                 ql = &queue->ready_list;
933             }
934         } else {
935             next_state = AFS_WQ_NODE_STATE_ERROR;
936             ql = &queue->done_list;
937         }
938     } else {
939         next_state = AFS_WQ_NODE_STATE_DONE;
940         code = 0;
941         ql = &queue->done_list;
942     }
943
944     _afs_wq_dec_running_count(queue);
945
946     node->retcode = code;
947
948     if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
949         (next_state == AFS_WQ_NODE_STATE_ERROR)) {
950
951         MUTEX_ENTER(&queue->lock);
952
953         if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
954             /* signal other threads if we're about to below the low
955              * pending-tasks threshold */
956             queue->drain = 0;
957             CV_SIGNAL(&queue->pend_cv);
958         }
959
960         if (queue->pend_count == 1) {
961             /* signal other threads if we're about to become 'empty' */
962             CV_BROADCAST(&queue->empty_cv);
963         }
964
965         queue->pend_count--;
966
967         MUTEX_EXIT(&queue->lock);
968     }
969
970     ret = _afs_wq_node_state_wait_busy(node);
971     if (ret) {
972         goto error;
973     }
974
975     /* propagate scheduling changes down through dependencies */
976     ret = _afs_wq_dep_propagate(node, next_state);
977     if (ret) {
978         goto error;
979     }
980
981     ret = _afs_wq_node_state_wait_busy(node);
982     if (ret) {
983         goto error;
984     }
985
986     if (detached &&
987         ((next_state == AFS_WQ_NODE_STATE_DONE) ||
988          (next_state == AFS_WQ_NODE_STATE_ERROR))) {
989         _afs_wq_node_state_change(node, next_state);
990         _afs_wq_node_put_r(node, 1);
991     } else {
992         ret = _afs_wq_node_list_enqueue(ql,
993                                              node,
994                                              next_state);
995     }
996
997  error:
998     return ret;
999 }
1000
1001 /**
1002  * initialize a struct afs_work_queue_opts to the default values
1003  *
1004  * @param[out] opts  opts struct to initialize
1005  */
1006 void
1007 afs_wq_opts_init(struct afs_work_queue_opts *opts)
1008 {
1009     opts->pend_lothresh = 0;
1010     opts->pend_hithresh = 0;
1011 }
1012
1013 /**
1014  * set the options for a struct afs_work_queue_opts appropriate for a certain
1015  * number of threads.
1016  *
1017  * @param[out] opts   opts struct in which to set the values
1018  * @param[in] threads number of threads
1019  */
1020 void
1021 afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
1022 {
1023     opts->pend_lothresh = threads * 2;
1024     opts->pend_hithresh = threads * 16;
1025
1026     /* safety */
1027     if (opts->pend_lothresh < 1) {
1028         opts->pend_lothresh = 1;
1029     }
1030     if (opts->pend_hithresh < 2) {
1031         opts->pend_hithresh = 2;
1032     }
1033 }
1034
1035 /**
1036  * allocate and initialize a work queue object.
1037  *
1038  * @param[out]   queue_out  address in which to store newly allocated work queue object
1039  * @param[in]    rock       work queue opaque pointer (passed as first arg to all fired callbacks)
1040  * @param[in]    opts       options for the new created queue
1041  *
1042  * @return operation status
1043  *    @retval 0 success
1044  *    @retval ENOMEM         out of memory
1045  */
1046 int
1047 afs_wq_create(struct afs_work_queue ** queue_out,
1048               void * rock,
1049               struct afs_work_queue_opts *opts)
1050 {
1051     int ret = 0;
1052     struct afs_work_queue * queue;
1053
1054     ret = _afs_wq_alloc(queue_out);
1055     if (ret) {
1056         goto error;
1057     }
1058     queue = *queue_out;
1059
1060     if (opts) {
1061         memcpy(&queue->opts, opts, sizeof(queue->opts));
1062     } else {
1063         afs_wq_opts_init(&queue->opts);
1064     }
1065
1066     _afs_wq_node_list_init(&queue->ready_list,
1067                                 AFS_WQ_NODE_LIST_READY);
1068     _afs_wq_node_list_init(&queue->blocked_list,
1069                                 AFS_WQ_NODE_LIST_BLOCKED);
1070     _afs_wq_node_list_init(&queue->done_list,
1071                                 AFS_WQ_NODE_LIST_DONE);
1072     queue->rock = rock;
1073     queue->drain = 0;
1074     queue->shutdown = 0;
1075     queue->pend_count = 0;
1076     queue->running_count = 0;
1077
1078     MUTEX_INIT(&queue->lock, "queue", MUTEX_DEFAULT, 0);
1079     CV_INIT(&queue->pend_cv, "queue pending", CV_DEFAULT, 0);
1080     CV_INIT(&queue->empty_cv, "queue empty", CV_DEFAULT, 0);
1081     CV_INIT(&queue->running_cv, "queue running", CV_DEFAULT, 0);
1082
1083  error:
1084     return ret;
1085 }
1086
1087 /**
1088  * deallocate and free a work queue object.
1089  *
1090  * @param[in] queue  work queue to be destroyed
1091  *
1092  * @return operation status
1093  *    @retval 0  success
1094  *    @retval AFS_WQ_ERROR unspecified error
1095  */
1096 int
1097 afs_wq_destroy(struct afs_work_queue * queue)
1098 {
1099     int ret = 0;
1100
1101     ret = _afs_wq_node_list_destroy(&queue->ready_list);
1102     if (ret) {
1103         goto error;
1104     }
1105
1106     ret = _afs_wq_node_list_destroy(&queue->blocked_list);
1107     if (ret) {
1108         goto error;
1109     }
1110
1111     ret = _afs_wq_node_list_destroy(&queue->done_list);
1112     if (ret) {
1113         goto error;
1114     }
1115
1116     ret = _afs_wq_free(queue);
1117
1118  error:
1119     return ret;
1120 }
1121
1122 /**
1123  * shutdown a work queue.
1124  *
1125  * @param[in] queue work queue object pointer
1126  *
1127  * @return operation status
1128  *    @retval 0 success
1129  */
1130 int
1131 afs_wq_shutdown(struct afs_work_queue * queue)
1132 {
1133     int ret = 0;
1134
1135     MUTEX_ENTER(&queue->lock);
1136     if (queue->shutdown) {
1137         /* already shutdown, do nothing */
1138         MUTEX_EXIT(&queue->lock);
1139         goto error;
1140     }
1141     queue->shutdown = 1;
1142
1143     ret = _afs_wq_node_list_shutdown(&queue->ready_list);
1144     if (ret) {
1145         goto error;
1146     }
1147
1148     ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
1149     if (ret) {
1150         goto error;
1151     }
1152
1153     ret = _afs_wq_node_list_shutdown(&queue->done_list);
1154     if (ret) {
1155         goto error;
1156     }
1157
1158     /* signal everyone that could be waiting, since these conditions will
1159      * generally fail to signal on their own if we're shutdown, since no
1160      * progress is being made */
1161     CV_BROADCAST(&queue->pend_cv);
1162     CV_BROADCAST(&queue->empty_cv);
1163     MUTEX_EXIT(&queue->lock);
1164
1165  error:
1166     return ret;
1167 }
1168
1169 /**
1170  * allocate a work node.
1171  *
1172  * @param[out] node_out  address in which to store new work node
1173  *
1174  * @return operation status
1175  *    @retval 0 success
1176  *    @retval ENOMEM         out of memory
1177  */
1178 int
1179 afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
1180 {
1181     int ret = 0;
1182     struct afs_work_queue_node * node;
1183
1184     *node_out = node = (struct afs_work_queue_node *) malloc(sizeof(*node));
1185     if (node == NULL) {
1186         ret = ENOMEM;
1187         goto error;
1188     }
1189
1190     queue_NodeInit(&node->node_list);
1191     node->qidx = AFS_WQ_NODE_LIST_NONE;
1192     node->cbf = NULL;
1193     node->rock = node->queue = NULL;
1194     node->refcount = 1;
1195     node->block_count = 0;
1196     node->error_count = 0;
1197     MUTEX_INIT(&node->lock, "node", MUTEX_DEFAULT, 0);
1198     CV_INIT(&node->state_cv, "node state", CV_DEFAULT, 0);
1199     node->state = AFS_WQ_NODE_STATE_INIT;
1200     queue_Init(&node->dep_children);
1201
1202  error:
1203     return ret;
1204 }
1205
1206 /**
1207  * free a work node.
1208  *
1209  * @param[in] node  work node object
1210  *
1211  * @return operation status
1212  *    @retval 0 success
1213  *
1214  * @internal
1215  */
1216 static int
1217 _afs_wq_node_free(struct afs_work_queue_node * node)
1218 {
1219     int ret = 0;
1220
1221     if (queue_IsOnQueue(node) ||
1222         (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
1223         (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
1224         (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
1225         ret = AFS_WQ_ERROR;
1226         goto error;
1227     }
1228
1229     ret = _afs_wq_node_free_deps(node);
1230     if (ret) {
1231         goto error;
1232     }
1233
1234     MUTEX_DESTROY(&node->lock);
1235     CV_DESTROY(&node->state_cv);
1236
1237     if (node->rock_dtor) {
1238         (*node->rock_dtor) (node->rock);
1239     }
1240
1241     free(node);
1242
1243  error:
1244     return ret;
1245 }
1246
1247 /**
1248  * get a reference to a work node.
1249  *
1250  * @param[in] node  work queue node
1251  *
1252  * @return operation status
1253  *    @retval 0 success
1254  */
1255 int
1256 afs_wq_node_get(struct afs_work_queue_node * node)
1257 {
1258     MUTEX_ENTER(&node->lock);
1259     node->refcount++;
1260     MUTEX_EXIT(&node->lock);
1261
1262     return 0;
1263 }
1264
1265 /**
1266  * put back a reference to a work node.
1267  *
1268  * @param[in] node  work queue node
1269  * @param[in] drop  drop node->lock
1270  *
1271  * @post if refcount reaches zero, node is deallocated.
1272  *
1273  * @return operation status
1274  *    @retval 0 success
1275  *
1276  * @pre node->lock held
1277  *
1278  * @internal
1279  */
1280 static int
1281 _afs_wq_node_put_r(struct afs_work_queue_node * node,
1282                    int drop)
1283 {
1284     afs_uint32 refc;
1285
1286     osi_Assert(node->refcount > 0);
1287     refc = --node->refcount;
1288     if (drop) {
1289         MUTEX_EXIT(&node->lock);
1290     }
1291     if (!refc) {
1292         osi_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
1293         _afs_wq_node_free(node);
1294     }
1295
1296     return 0;
1297 }
1298
1299 /**
1300  * put back a reference to a work node.
1301  *
1302  * @param[in] node  work queue node
1303  *
1304  * @post if refcount reaches zero, node is deallocated.
1305  *
1306  * @return operation status
1307  *    @retval 0 success
1308  */
1309 int
1310 afs_wq_node_put(struct afs_work_queue_node * node)
1311 {
1312     MUTEX_ENTER(&node->lock);
1313     return _afs_wq_node_put_r(node, 1);
1314 }
1315
1316 /**
1317  * set the callback function on a work node.
1318  *
1319  * @param[in] node  work queue node
1320  * @param[in] cbf   callback function
1321  * @param[in] rock  opaque pointer passed to callback
1322  * @param[in] rock_dtor  destructor function for 'rock', or NULL
1323  *
1324  * @return operation status
1325  *    @retval 0 success
1326  */
1327 int
1328 afs_wq_node_set_callback(struct afs_work_queue_node * node,
1329                          afs_wq_callback_func_t * cbf,
1330                          void * rock, afs_wq_callback_dtor_t *dtor)
1331 {
1332     MUTEX_ENTER(&node->lock);
1333     node->cbf = cbf;
1334     node->rock = rock;
1335     node->rock_dtor = dtor;
1336     MUTEX_EXIT(&node->lock);
1337
1338     return 0;
1339 }
1340
1341 /**
1342  * detach work node.
1343  *
1344  * @param[in] node  work queue node
1345  *
1346  * @return operation status
1347  *    @retval 0 success
1348  */
1349 int
1350 afs_wq_node_set_detached(struct afs_work_queue_node * node)
1351 {
1352     MUTEX_ENTER(&node->lock);
1353     node->detached = 1;
1354     MUTEX_EXIT(&node->lock);
1355
1356     return 0;
1357 }
1358
1359 /**
1360  * link a dependency node to a parent and child work node.
1361  *
1362  * This links a dependency node such that when the 'parent' work node is
1363  * done, the 'child' work node can proceed.
1364  *
1365  * @param[in]  dep      dependency node
1366  * @param[in]  parent   parent node in this dependency
1367  * @param[in]  child    child node in this dependency
1368  *
1369  * @return operation status
1370  *    @retval 0 success
1371  *
1372  * @pre
1373  *   - parent->lock held
1374  *   - child->lock held
1375  *   - parent and child in quiescent state
1376  *
1377  * @internal
1378  */
1379 static int
1380 _afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
1381                    struct afs_work_queue_node *parent,
1382                    struct afs_work_queue_node *child)
1383 {
1384     int ret = 0;
1385
1386     /* Each dep node adds a ref to the child node of that dep. We do not
1387      * do the same for the parent node, since if the only refs remaining
1388      * for a node are deps in node->dep_children, then the node should be
1389      * destroyed, and we will destroy the dep nodes when we free the
1390      * work node. */
1391     ret = _afs_wq_node_get_r(child);
1392     if (ret) {
1393         goto error;
1394     }
1395
1396     /* add this dep node to the parent node's list of deps */
1397     queue_Append(&parent->dep_children, &dep->parent_list);
1398
1399     dep->child = child;
1400     dep->parent = parent;
1401
1402  error:
1403     return ret;
1404 }
1405
1406 /**
1407  * add a dependency to a work node.
1408  *
1409  * @param[in] child  node which will be dependent upon completion of parent
1410  * @param[in] parent node whose completion gates child's execution
1411  *
1412  * @pre
1413  *   - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1414  *
1415  * @return operation status
1416  *    @retval 0 success
1417  */
1418 int
1419 afs_wq_node_dep_add(struct afs_work_queue_node * child,
1420                     struct afs_work_queue_node * parent)
1421 {
1422     int ret = 0;
1423     struct afs_work_queue_dep_node * dep = NULL;
1424     struct afs_work_queue_node_multilock ml;
1425     int held = 0;
1426
1427     /* self references are bad, mmkay? */
1428     if (parent == child) {
1429         ret = AFS_WQ_ERROR;
1430         goto error;
1431     }
1432
1433     ret = _afs_wq_dep_alloc(&dep);
1434     if (ret) {
1435         goto error;
1436     }
1437
1438     memset(&ml, 0, sizeof(ml));
1439     ml.nodes[0].node = parent;
1440     ml.nodes[1].node = child;
1441     ret = _afs_wq_node_multilock(&ml);
1442     if (ret) {
1443         goto error;
1444     }
1445     held = 1;
1446
1447     /* only allow dep modification while in initial state
1448      * or running state (e.g. do a dep add while inside callback) */
1449     if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1450         (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1451         ret = AFS_WQ_ERROR;
1452         goto error;
1453     }
1454
1455     /* link dep node with child and parent work queue node */
1456     ret = _afs_wq_dep_link_r(dep, parent, child);
1457     if (ret) {
1458         goto error;
1459     }
1460
1461     /* handle blocking counts */
1462     switch (parent->state) {
1463     case AFS_WQ_NODE_STATE_INIT:
1464     case AFS_WQ_NODE_STATE_SCHEDULED:
1465     case AFS_WQ_NODE_STATE_RUNNING:
1466     case AFS_WQ_NODE_STATE_BLOCKED:
1467         child->block_count++;
1468         break;
1469
1470     case AFS_WQ_NODE_STATE_ERROR:
1471         child->error_count++;
1472         break;
1473
1474     default:
1475         (void)0; /* nop */
1476     }
1477
1478  done:
1479     if (held) {
1480         MUTEX_EXIT(&child->lock);
1481         MUTEX_EXIT(&parent->lock);
1482     }
1483     return ret;
1484
1485  error:
1486     if (dep) {
1487         _afs_wq_dep_free(dep);
1488     }
1489     goto done;
1490 }
1491
1492 /**
1493  * remove a dependency from a work node.
1494  *
1495  * @param[in] child  node which was dependent upon completion of parent
1496  * @param[in] parent node whose completion gated child's execution
1497  *
1498  * @return operation status
1499  *    @retval 0 success
1500  */
1501 int
1502 afs_wq_node_dep_del(struct afs_work_queue_node * child,
1503                     struct afs_work_queue_node * parent)
1504 {
1505     int code, ret = 0;
1506     struct afs_work_queue_dep_node * dep, * ndep;
1507     struct afs_work_queue_node_multilock ml;
1508     int held = 0;
1509
1510     memset(&ml, 0, sizeof(ml));
1511     ml.nodes[0].node = parent;
1512     ml.nodes[1].node = child;
1513     code = _afs_wq_node_multilock(&ml);
1514     if (code) {
1515         goto error;
1516     }
1517     held = 1;
1518
1519     /* only permit changes while child is in init state
1520      * or running state (e.g. do a dep del when in callback func) */
1521     if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1522         (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1523         ret = AFS_WQ_ERROR;
1524         goto error;
1525     }
1526
1527     /* locate node linking parent and child */
1528     for (queue_Scan(&parent->dep_children,
1529                     dep,
1530                     ndep,
1531                     afs_work_queue_dep_node)) {
1532         if ((dep->child == child) &&
1533             (dep->parent == parent)) {
1534
1535             /* no need to grab an extra ref on dep->child here; the caller
1536              * should already have a ref on dep->child */
1537             code = _afs_wq_dep_unlink_r(dep);
1538             if (code) {
1539                 ret = code;
1540                 goto error;
1541             }
1542
1543             code = _afs_wq_dep_free(dep);
1544             if (code) {
1545                 ret = code;
1546                 goto error;
1547             }
1548             break;
1549         }
1550     }
1551
1552  error:
1553     if (held) {
1554         MUTEX_EXIT(&child->lock);
1555         MUTEX_EXIT(&parent->lock);
1556     }
1557     return ret;
1558 }
1559
1560 /**
1561  * block a work node from execution.
1562  *
1563  * this can be used to allow external events to influence work queue flow.
1564  *
1565  * @param[in] node  work queue node to be blocked
1566  *
1567  * @return operation status
1568  *    @retval 0 success
1569  *
1570  * @post external block count incremented
1571  */
1572 int
1573 afs_wq_node_block(struct afs_work_queue_node * node)
1574 {
1575     int ret = 0;
1576     int start;
1577
1578     MUTEX_ENTER(&node->lock);
1579     ret = _afs_wq_node_state_wait_busy(node);
1580     if (ret) {
1581         goto error_sync;
1582     }
1583
1584     start = node->block_count++;
1585
1586     if (!start &&
1587         (node->qidx == AFS_WQ_NODE_LIST_READY)) {
1588         /* unblocked->blocked transition, and we're already scheduled */
1589         ret = _afs_wq_node_list_remove(node,
1590                                             AFS_WQ_NODE_STATE_BUSY);
1591         if (ret) {
1592             goto error_sync;
1593         }
1594
1595         ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
1596                                              node,
1597                                              AFS_WQ_NODE_STATE_BLOCKED);
1598     }
1599
1600  error_sync:
1601     MUTEX_EXIT(&node->lock);
1602
1603     return ret;
1604 }
1605
1606 /**
1607  * unblock a work node for execution.
1608  *
1609  * this can be used to allow external events to influence work queue flow.
1610  *
1611  * @param[in] node  work queue node to be blocked
1612  *
1613  * @return operation status
1614  *    @retval 0 success
1615  *
1616  * @post external block count decremented
1617  */
1618 int
1619 afs_wq_node_unblock(struct afs_work_queue_node * node)
1620 {
1621     int ret = 0;
1622     int end;
1623
1624     MUTEX_ENTER(&node->lock);
1625     ret = _afs_wq_node_state_wait_busy(node);
1626     if (ret) {
1627         goto error_sync;
1628     }
1629
1630     end = --node->block_count;
1631
1632     if (!end &&
1633         (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
1634         /* blocked->unblock transition, and we're ready to be scheduled */
1635         ret = _afs_wq_node_list_remove(node,
1636                                             AFS_WQ_NODE_STATE_BUSY);
1637         if (ret) {
1638             goto error_sync;
1639         }
1640
1641         ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
1642                                              node,
1643                                              AFS_WQ_NODE_STATE_SCHEDULED);
1644     }
1645
1646  error_sync:
1647     MUTEX_EXIT(&node->lock);
1648
1649     return ret;
1650 }
1651
1652 /**
1653  * initialize a afs_wq_add_opts struct with the default options.
1654  *
1655  * @param[out] opts  options structure to initialize
1656  */
1657 void
1658 afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
1659 {
1660     opts->donate = 0;
1661     opts->block = 1;
1662     opts->force = 0;
1663 }
1664
1665 /**
1666  * schedule a work node for execution.
1667  *
1668  * @param[in] queue  work queue
1669  * @param[in] node   work node
1670  * @param[in] opts   options for adding, or NULL for defaults
1671  *
1672  * @return operation status
1673  *    @retval 0 success
1674  *    @retval EWOULDBLOCK queue is full and opts specified not to block
1675  *    @retval EINTR queue was full, we blocked to add, and the queue was
1676  *                  shutdown while we were blocking
1677  */
1678 int
1679 afs_wq_add(struct afs_work_queue *queue,
1680            struct afs_work_queue_node *node,
1681            struct afs_work_queue_add_opts *opts)
1682 {
1683     int ret = 0;
1684     int donate, block, force, hithresh;
1685     struct afs_work_queue_node_list * list;
1686     struct afs_work_queue_add_opts l_opts;
1687     int waited_for_drain = 0;
1688     afs_wq_work_state_t state;
1689
1690     if (!opts) {
1691         afs_wq_add_opts_init(&l_opts);
1692         opts = &l_opts;
1693     }
1694
1695     donate = opts->donate;
1696     block = opts->block;
1697     force = opts->force;
1698
1699  retry:
1700     MUTEX_ENTER(&node->lock);
1701
1702     ret = _afs_wq_node_state_wait_busy(node);
1703     if (ret) {
1704         goto error;
1705     }
1706
1707     if (!node->block_count && !node->error_count) {
1708         list = &queue->ready_list;
1709         state = AFS_WQ_NODE_STATE_SCHEDULED;
1710     } else if (node->error_count) {
1711         list = &queue->done_list;
1712         state = AFS_WQ_NODE_STATE_ERROR;
1713     } else {
1714         list = &queue->blocked_list;
1715         state = AFS_WQ_NODE_STATE_BLOCKED;
1716     }
1717
1718     ret = 0;
1719
1720     MUTEX_ENTER(&queue->lock);
1721
1722     if (queue->shutdown) {
1723         ret = EINTR;
1724         MUTEX_EXIT(&queue->lock);
1725         MUTEX_EXIT(&node->lock);
1726         goto error;
1727     }
1728
1729     hithresh = queue->opts.pend_hithresh;
1730     if (hithresh > 0 && queue->pend_count >= hithresh) {
1731         queue->drain = 1;
1732     }
1733
1734     if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
1735                    || state == AFS_WQ_NODE_STATE_BLOCKED)) {
1736
1737         if (queue->drain) {
1738             if (block) {
1739                 MUTEX_EXIT(&node->lock);
1740                 CV_WAIT(&queue->pend_cv, &queue->lock);
1741
1742                 if (queue->shutdown) {
1743                     ret = EINTR;
1744                 } else {
1745                     MUTEX_EXIT(&queue->lock);
1746
1747                     waited_for_drain = 1;
1748
1749                     goto retry;
1750                 }
1751             } else {
1752                 ret = EWOULDBLOCK;
1753             }
1754         }
1755     }
1756
1757     if (ret == 0) {
1758         queue->pend_count++;
1759     }
1760     if (waited_for_drain) {
1761         /* signal another thread that may have been waiting for drain */
1762         CV_SIGNAL(&queue->pend_cv);
1763     }
1764
1765     MUTEX_EXIT(&queue->lock);
1766
1767     if (ret) {
1768         goto error;
1769     }
1770
1771     if (!donate)
1772         node->refcount++;
1773     node->queue = queue;
1774
1775     ret = _afs_wq_node_list_enqueue(list,
1776                                          node,
1777                                          state);
1778  error:
1779     return ret;
1780 }
1781
1782 /**
1783  * de-schedule a work node.
1784  *
1785  * @param[in] node  work node
1786  *
1787  * @return operation status
1788  *    @retval 0 success
1789  */
1790 int
1791 afs_wq_del(struct afs_work_queue_node * node)
1792 {
1793     /* XXX todo */
1794     return ENOTSUP;
1795 }
1796
1797 /**
1798  * execute a node on the queue.
1799  *
1800  * @param[in] queue  work queue
1801  * @param[in] rock   opaque pointer (passed as third arg to callback func)
1802  *
1803  * @return operation status
1804  *    @retval 0 completed a work unit
1805  */
1806 int
1807 afs_wq_do(struct afs_work_queue * queue,
1808           void * rock)
1809 {
1810     return _afs_wq_do(queue, rock, 1);
1811 }
1812
1813 /**
1814  * execute a node on the queue, if there is any work to do.
1815  *
1816  * @param[in] queue  work queue
1817  * @param[in] rock   opaque pointer (passed as third arg to callback func)
1818  *
1819  * @return operation status
1820  *    @retval 0 completed a work unit
1821  *    @retval EWOULDBLOCK there was nothing to do
1822  */
1823 int
1824 afs_wq_do_nowait(struct afs_work_queue * queue,
1825                  void * rock)
1826 {
1827     return _afs_wq_do(queue, rock, 0);
1828 }
1829
1830 /**
1831  * wait for all pending nodes to finish.
1832  *
1833  * @param[in] queue  work queue
1834  *
1835  * @return operation status
1836  *   @retval 0 success
1837  *
1838  * @post the specified queue was empty at some point; it may not be empty by
1839  * the time this function returns, but at some point after the function was
1840  * called, there were no nodes in the ready queue or blocked queue.
1841  */
1842 int
1843 afs_wq_wait_all(struct afs_work_queue *queue)
1844 {
1845     int ret = 0;
1846
1847     MUTEX_ENTER(&queue->lock);
1848
1849     while (queue->pend_count > 0 && !queue->shutdown) {
1850         CV_WAIT(&queue->empty_cv, &queue->lock);
1851     }
1852
1853     if (queue->shutdown) {
1854         /* queue has been shut down, but there may still be some threads
1855          * running e.g. in the middle of their callback. ensure they have
1856          * stopped before we return. */
1857         while (queue->running_count > 0) {
1858             CV_WAIT(&queue->running_cv, &queue->lock);
1859         }
1860         ret = EINTR;
1861         goto done;
1862     }
1863
1864  done:
1865     MUTEX_EXIT(&queue->lock);
1866
1867     /* technically this doesn't really guarantee that the work queue is empty
1868      * after we return, but we do guarantee that it was empty at some point */
1869
1870     return ret;
1871 }
1872
1873 /**
1874  * wait for a node to complete; dequeue from done list.
1875  *
1876  * @param[in]  node     work queue node
1877  * @param[out] retcode  return code from work unit
1878  *
1879  * @return operation status
1880  *    @retval 0 sucess
1881  *
1882  * @pre ref held on node
1883  */
1884 int
1885 afs_wq_node_wait(struct afs_work_queue_node * node,
1886                  int * retcode)
1887 {
1888     int ret = 0;
1889
1890     MUTEX_ENTER(&node->lock);
1891     if (node->state == AFS_WQ_NODE_STATE_INIT) {
1892         /* not sure what to do in this case */
1893         goto done_sync;
1894     }
1895
1896     while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
1897            (node->state != AFS_WQ_NODE_STATE_ERROR)) {
1898         CV_WAIT(&node->state_cv, &node->lock);
1899     }
1900     if (retcode) {
1901         *retcode = node->retcode;
1902     }
1903
1904     if (node->queue == NULL) {
1905         /* nothing we can do */
1906         goto done_sync;
1907     }
1908
1909     ret = _afs_wq_node_list_remove(node,
1910                                         AFS_WQ_NODE_STATE_INIT);
1911
1912  done_sync:
1913     MUTEX_EXIT(&node->lock);
1914
1915     return ret;
1916 }