41343b18887a010a5da607318a186dd7fb85b381
[openafs.git] / src / util / work_queue.c
1 /*
2  * Copyright 2008-2010, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <afs/opr.h>
15
16 #include <sys/file.h>
17
18 #include <lock.h>
19
20 #define __AFS_WORK_QUEUE_IMPL 1
21 #include "work_queue.h"
22 #include "work_queue_impl.h"
23
24 /**
25  * public interfaces for work_queue.
26  */
27
28 static int
29 _afs_wq_node_put_r(struct afs_work_queue_node * node,
30                    int drop);
31
32 /**
33  * allocate a work queue object.
34  *
35  * @param[out] queue_out address in which to store queue pointer
36  *
37  * @return operation status
38  *    @retval 0 success
39  *    @retval ENOMEM out of memory
40  *
41  * @internal
42  */
43 static int
44 _afs_wq_alloc(struct afs_work_queue ** queue_out)
45 {
46     int ret = 0;
47     struct afs_work_queue * queue;
48
49     *queue_out = queue = malloc(sizeof(*queue));
50     if (queue == NULL) {
51         ret = ENOMEM;
52         goto error;
53     }
54
55  error:
56     return ret;
57 }
58
59 /**
60  * free a work queue object.
61  *
62  * @param[in] queue  work queue object
63  *
64  * @return operation status
65  *    @retval 0 success
66  *
67  * @internal
68  */
69 static int
70 _afs_wq_free(struct afs_work_queue * queue)
71 {
72     int ret = 0;
73
74     free(queue);
75
76     return ret;
77 }
78
79 /**
80  * change a node's state.
81  *
82  * @param[in] node       node object
83  * @param[in] new_state  new object state
84  *
85  * @return old state
86  *
87  * @pre node->lock held
88  *
89  * @internal
90  */
91 static afs_wq_work_state_t
92 _afs_wq_node_state_change(struct afs_work_queue_node * node,
93                           afs_wq_work_state_t new_state)
94 {
95     afs_wq_work_state_t old_state;
96
97     old_state = node->state;
98     node->state = new_state;
99
100     CV_BROADCAST(&node->state_cv);
101
102     return old_state;
103 }
104
105 /**
106  * wait for a node's state to change from busy to something else.
107  *
108  * @param[in] node  node object
109  *
110  * @return operation status
111  *    @retval 0 success
112  *
113  * @pre node->lock held
114  *
115  * @internal
116  */
117 static int
118 _afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
119 {
120     while (node->state == AFS_WQ_NODE_STATE_BUSY) {
121         CV_WAIT(&node->state_cv, &node->lock);
122     }
123
124     return 0;
125 }
126
127 /**
128  * check whether a work queue node is busy.
129  *
130  * @param[in] node  node object pointer
131  *
132  * @return whether node is busy
133  *    @retval 1 node is busy
134  *    @retval 0 node is not busy
135  *
136  * @pre node->lock held
137  *
138  * @internal
139  */
140 static int
141 _afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
142 {
143     return (node->state == AFS_WQ_NODE_STATE_BUSY);
144 }
145
146 /**
147  * attempt to simultaneously lock two work queue nodes.
148  *
149  * this is a somewhat tricky algorithm because there is no
150  * defined hierarchy within the work queue node population.
151  *
152  * @param[in] ml  multilock control structure
153  *
154  * @return operation status
155  *    @retval 0
156  *
157  * @note in theory, we could easily extend this to
158  *       lock more than two nodes
159  *
160  * @pre
161  *   - caller MUST NOT have set busy state on either node
162  *
163  * @post
164  *   - locks held on both nodes
165  *   - both nodes in quiescent states
166  *
167  * @note node with non-zero lock_held or busy_held fields
168  *       MUST go in array index 0
169  *
170  * @internal
171  */
172 static int
173 _afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
174 {
175     int code, ret = 0;
176     struct timespec delay;
177     int first = 1, second = 0, tmp;
178
179     /* first pass processing */
180     if (ml->nodes[0].lock_held) {
181         if (!ml->nodes[0].busy_held) {
182             ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
183             if (ret) {
184                 goto error;
185             }
186         }
187
188         code = MUTEX_TRYENTER(&ml->nodes[1].node->lock);
189         if (code) {
190             /* success */
191             goto done;
192         }
193
194         /* setup for main loop */
195         MUTEX_EXIT(&ml->nodes[0].node->lock);
196     }
197
198     /*
199      * setup random exponential backoff
200      *
201      * set initial delay to random value in the range [500,1000) ns
202      */
203     delay.tv_sec = 0;
204     delay.tv_nsec = 500 + rand() % 500;
205
206     while (1) {
207         MUTEX_ENTER(&ml->nodes[first].node->lock);
208         if ((first != 0) || !ml->nodes[0].busy_held) {
209             ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
210             if (ret) {
211                 /* cleanup */
212                 if (!ml->nodes[0].lock_held || first) {
213                     MUTEX_EXIT(&ml->nodes[first].node->lock);
214                     if (ml->nodes[0].lock_held) {
215                         /* on error, return with locks in same state as before call */
216                         MUTEX_ENTER(&ml->nodes[0].node->lock);
217                     }
218                 }
219                 goto error;
220             }
221         }
222
223         /*
224          * in order to avoid deadlock, we must use trylock and
225          * a non-blocking state check.  if we meet any contention,
226          * we must drop back and start again.
227          */
228         code = MUTEX_TRYENTER(&ml->nodes[second].node->lock);
229         if (code) {
230             if (((second == 0) && (ml->nodes[0].busy_held)) ||
231                 !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
232                 /* success */
233                 break;
234             } else {
235                 MUTEX_EXIT(&ml->nodes[second].node->lock);
236             }
237         }
238
239         /*
240          * contended.
241          *
242          * drop locks, use exponential backoff,
243          * try acquiring in the opposite order
244          */
245         MUTEX_EXIT(&ml->nodes[first].node->lock);
246         nanosleep(&delay, NULL);
247         if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
248             delay.tv_nsec <<= 1;
249         }
250         tmp = second;
251         second = first;
252         first = tmp;
253     }
254
255  done:
256  error:
257     return ret;
258 }
259
260 /**
261  * initialize a node list object.
262  *
263  * @param[in] list list object
264  * @param[in] id   list identifier
265  *
266  * @return operation status
267  *    @retval 0 success
268  *
269  * @internal
270  */
271 static int
272 _afs_wq_node_list_init(struct afs_work_queue_node_list * list,
273                        afs_wq_node_list_id_t id)
274 {
275     queue_Init(&list->list);
276     MUTEX_INIT(&list->lock, "list", MUTEX_DEFAULT, 0);
277     CV_INIT(&list->cv, "list", CV_DEFAULT, 0);
278     list->qidx = id;
279     list->shutdown = 0;
280
281     return 0;
282 }
283
284 /**
285  * destroy a node list object.
286  *
287  * @param[in] list list object
288  *
289  * @return operation status
290  *    @retval 0 success
291  *    @retval AFS_WQ_ERROR list not empty
292  *
293  * @internal
294  */
295 static int
296 _afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
297 {
298     int ret = 0;
299
300     if (queue_IsNotEmpty(&list->list)) {
301         ret = AFS_WQ_ERROR;
302         goto error;
303     }
304
305     MUTEX_DESTROY(&list->lock);
306     CV_DESTROY(&list->cv);
307
308  error:
309     return ret;
310 }
311
312 /**
313  * wakeup all threads waiting in dequeue.
314  *
315  * @param[in] list list object
316  *
317  * @return operation status
318  *    @retval 0 success
319  *
320  * @internal
321  */
322 static int
323 _afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
324 {
325     int ret = 0;
326     struct afs_work_queue_node *node, *nnode;
327
328     MUTEX_ENTER(&list->lock);
329     list->shutdown = 1;
330
331     for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
332         _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
333         queue_Remove(node);
334         node->qidx = AFS_WQ_NODE_LIST_NONE;
335         node->queue = NULL;
336
337         if (node->detached) {
338             /* if we are detached, we hold the reference on the node;
339              * otherwise, it is some other caller that holds the reference.
340              * So don't put the node if we are not detached; the node will
341              * get freed when someone else calls afs_wq_node_put */
342             afs_wq_node_put(node);
343         }
344     }
345
346     CV_BROADCAST(&list->cv);
347     MUTEX_EXIT(&list->lock);
348
349     return ret;
350 }
351
352 /**
353  * append to a node list object.
354  *
355  * @param[in] list  list object
356  * @param[in] node  node object
357  * @param[in] state new node state
358  *
359  * @return operation status
360  *    @retval 0 success
361  *    @retval AFS_WQ_ERROR raced to enqueue node
362  *
363  * @pre
364  *   - node lock held
365  *   - node is not on a list
366  *   - node is either not busy, or it is marked as busy by the calling thread
367  *
368  * @post
369  *   - enqueued on list
370  *   - node lock dropped
371  *
372  * @internal
373  */
374 static int
375 _afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
376                           struct afs_work_queue_node * node,
377                           afs_wq_work_state_t state)
378 {
379     int code, ret = 0;
380
381     if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
382         /* raced */
383         ret = AFS_WQ_ERROR;
384         goto error;
385     }
386
387     /* deal with lock inversion */
388     code = MUTEX_TRYENTER(&list->lock);
389     if (!code) {
390         /* contended */
391         _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
392         MUTEX_EXIT(&node->lock);
393         MUTEX_ENTER(&list->lock);
394         MUTEX_ENTER(&node->lock);
395
396         /* assert state of the world (we set busy, so this should never happen) */
397         opr_Assert(queue_IsNotOnQueue(node));
398     }
399
400     if (list->shutdown) {
401         ret = AFS_WQ_ERROR;
402         goto error_unlock;
403     }
404
405     opr_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
406     if (queue_IsEmpty(&list->list)) {
407         /* wakeup a dequeue thread */
408         CV_SIGNAL(&list->cv);
409     }
410     queue_Append(&list->list, node);
411     node->qidx = list->qidx;
412     _afs_wq_node_state_change(node, state);
413
414  error_unlock:
415     MUTEX_EXIT(&node->lock);
416     MUTEX_EXIT(&list->lock);
417
418  error:
419     return ret;
420 }
421
422 /**
423  * dequeue a node from a list object.
424  *
425  * @param[in]    list      list object
426  * @param[out]   node_out  address in which to store node object pointer
427  * @param[in]    state     new node state
428  * @param[in]    block     permit blocking on cv if asserted
429  *
430  * @return operation status
431  *    @retval 0 success
432  *    @retval EWOULDBLOCK block not asserted and nothing to dequeue
433  *    @retval EINTR blocking wait interrupted by list shutdown
434  *
435  * @post node object returned with node lock held and new state set
436  *
437  * @internal
438  */
439 static int
440 _afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
441                           struct afs_work_queue_node ** node_out,
442                           afs_wq_work_state_t state,
443                           int block)
444 {
445     int ret = 0;
446     struct afs_work_queue_node * node;
447
448     MUTEX_ENTER(&list->lock);
449
450     if (list->shutdown) {
451         *node_out = NULL;
452         ret = EINTR;
453         goto done_sync;
454     }
455
456     if (!block && queue_IsEmpty(&list->list)) {
457         *node_out = NULL;
458         ret = EWOULDBLOCK;
459         goto done_sync;
460     }
461
462     while (queue_IsEmpty(&list->list)) {
463         if (list->shutdown) {
464             *node_out = NULL;
465             ret = EINTR;
466             goto done_sync;
467         }
468         CV_WAIT(&list->cv, &list->lock);
469     }
470
471     *node_out = node = queue_First(&list->list, afs_work_queue_node);
472
473     MUTEX_ENTER(&node->lock);
474     queue_Remove(node);
475     node->qidx = AFS_WQ_NODE_LIST_NONE;
476     _afs_wq_node_state_change(node, state);
477
478  done_sync:
479     MUTEX_EXIT(&list->lock);
480
481     return ret;
482 }
483
484 /**
485  * remove a node from a list.
486  *
487  * @param[in] node        node object
488  * @param[in] next_state  node state following successful dequeue
489  *
490  * @return operation status
491  *    @retval 0 success
492  *    @retval AFS_WQ_ERROR in any of the following conditions:
493  *              - node not associated with a work queue
494  *              - node was not on a linked list (e.g. RUNNING state)
495  *              - we raced another thread
496  *
497  * @pre node->lock held
498  *
499  * @post node removed from node list
500  *
501  * @note node->lock may be dropped internally
502  *
503  * @internal
504  */
505 static int
506 _afs_wq_node_list_remove(struct afs_work_queue_node * node,
507                          afs_wq_work_state_t next_state)
508 {
509     int code, ret = 0;
510     struct afs_work_queue_node_list * list = NULL;
511
512     _afs_wq_node_state_wait_busy(node);
513
514     if (!node->queue) {
515         ret = AFS_WQ_ERROR;
516         goto error;
517     }
518     switch (node->qidx) {
519     case AFS_WQ_NODE_LIST_READY:
520         list = &node->queue->ready_list;
521         break;
522
523     case AFS_WQ_NODE_LIST_BLOCKED:
524         list = &node->queue->blocked_list;
525         break;
526
527     case AFS_WQ_NODE_LIST_DONE:
528         list = &node->queue->done_list;
529         break;
530
531     default:
532         ret = AFS_WQ_ERROR;
533     }
534
535     if (list) {
536         code = MUTEX_TRYENTER(&list->lock);
537         if (!code) {
538             /* contended */
539             _afs_wq_node_state_change(node,
540                                            AFS_WQ_NODE_STATE_BUSY);
541             MUTEX_EXIT(&node->lock);
542             MUTEX_ENTER(&list->lock);
543             MUTEX_ENTER(&node->lock);
544
545             if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
546                 /* raced */
547                 ret= AFS_WQ_ERROR;
548                 goto done_sync;
549             }
550         }
551
552         queue_Remove(node);
553         node->qidx = AFS_WQ_NODE_LIST_NONE;
554         _afs_wq_node_state_change(node, next_state);
555
556     done_sync:
557         MUTEX_EXIT(&list->lock);
558     }
559
560  error:
561     return ret;
562 }
563
564 /**
565  * allocate a dependency node.
566  *
567  * @param[out] node_out address in which to store dep node pointer
568  *
569  * @return operation status
570  *    @retval 0 success
571  *    @retval ENOMEM   out of memory
572  *
573  * @internal
574  */
575 static int
576 _afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
577 {
578     int ret = 0;
579     struct afs_work_queue_dep_node * node;
580
581     node = malloc(sizeof(*node));
582     if (node == NULL) {
583         ret = ENOMEM;
584         goto error;
585     }
586
587     queue_NodeInit(&node->parent_list);
588     node->parent = node->child = NULL;
589
590     *node_out = node;
591
592  error:
593     return ret;
594 }
595
596 /**
597  * free a dependency node.
598  *
599  * @param[in] node dep node pointer
600  *
601  * @return operation status
602  *    @retval 0 success
603  *    @retval AFS_WQ_ERROR still attached to a work node
604  *
605  * @internal
606  */
607 static int
608 _afs_wq_dep_free(struct afs_work_queue_dep_node * node)
609 {
610     int ret = 0;
611
612     if (queue_IsOnQueue(&node->parent_list) ||
613         node->parent ||
614         node->child) {
615         ret = AFS_WQ_ERROR;
616         goto error;
617     }
618
619     free(node);
620
621  error:
622     return ret;
623 }
624
625 /**
626  * unlink work nodes from a dependency node.
627  *
628  * @param[in]  dep      dependency node
629  *
630  * @return operation status
631  *    @retval 0 success
632  *
633  * @pre
634  *   - dep->parent and dep->child are either locked, or are not referenced
635  *     by anything else
636  *   - caller holds ref on dep->child
637  *   - dep->child and dep->parent in quiescent state
638  *
639  * @internal
640  */
641 static int
642 _afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
643 {
644     struct afs_work_queue_node *child = dep->child;
645     queue_Remove(&dep->parent_list);
646     dep->child = NULL;
647     dep->parent = NULL;
648
649     return _afs_wq_node_put_r(child, 0);
650 }
651
652 /**
653  * get a reference to a work node.
654  *
655  * @param[in] node  work queue node
656  *
657  * @return operation status
658  *    @retval 0 success
659  *
660  * @pre node->lock held
661  *
662  * @internal
663  */
664 static int
665 _afs_wq_node_get_r(struct afs_work_queue_node * node)
666 {
667     node->refcount++;
668
669     return 0;
670 }
671
672 /**
673  * unlink and free all of the dependency nodes from a node.
674  *
675  * @param[in] parent  work node that is the parent node of all deps to be freed
676  *
677  * @return operation status
678  *  @retval 0 success
679  *
680  * @pre parent->refcount == 0
681  */
682 static int
683 _afs_wq_node_free_deps(struct afs_work_queue_node *parent)
684 {
685     int ret = 0, code;
686     struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
687     struct afs_work_queue_dep_node * dep, * nd;
688
689     /* unlink and free all of the dep structs attached to 'parent' */
690     for (queue_Scan(&parent->dep_children,
691                     dep,
692                     nd,
693                     afs_work_queue_dep_node)) {
694
695         MUTEX_ENTER(&dep->child->lock);
696         node_unlock = dep->child;
697
698         /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
699          * put the last ref on the child, and we need the child to still exist
700          * so we can unlock it */
701         code = _afs_wq_node_get_r(dep->child);
702         if (code) {
703             goto loop_error;
704         }
705         node_put = dep->child;
706
707         /* remember, no need to lock dep->parent, since its refcount is 0 */
708         code = _afs_wq_dep_unlink_r(dep);
709
710      loop_error:
711         if (node_put) {
712             _afs_wq_node_put_r(node_put, 1);
713         } else if (node_unlock) {
714             MUTEX_EXIT(&node_unlock->lock);
715         }
716         node_put = node_unlock = NULL;
717
718         if (code == 0) {
719             /* Only do this if everything is okay; if code is nonzero,
720              * something will still be pointing at dep, so don't free it.
721              * We will leak memory, but that's better than memory corruption;
722              * we've done all we can do to try and free the dep memory */
723             code = _afs_wq_dep_free(dep);
724         }
725
726         if (!ret) {
727             ret = code;
728         }
729     }
730     return ret;
731 }
732
733 /**
734  * propagate state down through dep nodes.
735  *
736  * @param[in] parent      parent node object
737  * @param[in] next_state  next state parent will assume
738  *
739  * @return operation status
740  *    @retval 0 success
741  *
742  * @pre
743  *   - parent->lock held
744  *
745  * @internal
746  */
747 static int
748 _afs_wq_dep_propagate(struct afs_work_queue_node * parent,
749                       afs_wq_work_state_t next_state)
750 {
751     int ret = 0;
752     struct afs_work_queue_dep_node * dep, * nd;
753     struct afs_work_queue_node_multilock ml;
754     afs_wq_work_state_t old_state;
755     afs_wq_node_list_id_t qidx;
756     struct afs_work_queue_node_list * ql;
757     afs_wq_work_state_t cns;
758
759     old_state = _afs_wq_node_state_change(parent,
760                                                AFS_WQ_NODE_STATE_BUSY);
761     ml.nodes[0].node = parent;
762     ml.nodes[0].lock_held = 1;
763     ml.nodes[0].busy_held = 1;
764
765     /* scan through our children updating scheduling state */
766     for (queue_Scan(&parent->dep_children,
767                     dep,
768                     nd,
769                     afs_work_queue_dep_node)) {
770         /* skip half-registered nodes */
771         if (dep->child == NULL) {
772             continue;
773         }
774
775         ml.nodes[1].node = dep->child;
776         ml.nodes[1].lock_held = 0;
777         ml.nodes[1].busy_held = 0;
778         ret = _afs_wq_node_multilock(&ml);
779         if (ret) {
780             goto error;
781         }
782
783         switch (next_state) {
784         case AFS_WQ_NODE_STATE_DONE:
785             dep->child->block_count--;
786             break;
787
788         case AFS_WQ_NODE_STATE_ERROR:
789             dep->child->error_count++;
790             break;
791
792         default:
793             (void)0; /* nop */
794         }
795
796         /* skip unscheduled nodes */
797         if (dep->child->queue == NULL) {
798             MUTEX_EXIT(&dep->child->lock);
799             continue;
800         }
801
802         /*
803          * when blocked dep and error'd dep counts reach zero, the
804          * node can be scheduled for execution
805          */
806         if (dep->child->error_count) {
807             ql = &dep->child->queue->done_list;
808             qidx = AFS_WQ_NODE_LIST_DONE;
809             cns = AFS_WQ_NODE_STATE_ERROR;
810         } else if (dep->child->block_count) {
811             ql = &dep->child->queue->blocked_list;
812             qidx = AFS_WQ_NODE_LIST_BLOCKED;
813             cns = AFS_WQ_NODE_STATE_BLOCKED;
814         } else {
815             ql = &dep->child->queue->ready_list;
816             qidx = AFS_WQ_NODE_LIST_READY;
817             cns = AFS_WQ_NODE_STATE_SCHEDULED;
818         }
819
820         if (qidx != dep->child->qidx) {
821             /* we're transitioning to a different queue */
822             ret = _afs_wq_node_list_remove(dep->child,
823                                                 AFS_WQ_NODE_STATE_BUSY);
824             if (ret) {
825                 MUTEX_EXIT(&dep->child->lock);
826                 goto error;
827             }
828
829             ret = _afs_wq_node_list_enqueue(ql,
830                                                  dep->child,
831                                                  cns);
832             if (ret) {
833                 MUTEX_EXIT(&dep->child->lock);
834                 goto error;
835             }
836         }
837         MUTEX_EXIT(&dep->child->lock);
838     }
839
840  error:
841     _afs_wq_node_state_change(parent,
842                                    old_state);
843     return ret;
844 }
845
846 /**
847  * decrements queue->running_count, and signals waiters if appropriate.
848  *
849  * @param[in] queue  queue to dec the running count of
850  */
851 static void
852 _afs_wq_dec_running_count(struct afs_work_queue *queue)
853 {
854     MUTEX_ENTER(&queue->lock);
855     queue->running_count--;
856     if (queue->shutdown && queue->running_count == 0) {
857         /* if we've shut down, someone may be waiting for the running count
858          * to drop to 0 */
859         CV_BROADCAST(&queue->running_cv);
860     }
861     MUTEX_EXIT(&queue->lock);
862 }
863
864 /**
865  * execute a node on the queue.
866  *
867  * @param[in] queue  work queue
868  * @param[in] rock   opaque pointer (passed as third arg to callback func)
869  * @param[in] block  allow blocking in dequeue
870  *
871  * @return operation status
872  *    @retval 0 completed a work unit
873  *
874  * @internal
875  */
876 static int
877 _afs_wq_do(struct afs_work_queue * queue,
878            void * rock,
879            int block)
880 {
881     int code, ret = 0;
882     struct afs_work_queue_node * node;
883     afs_wq_callback_func_t * cbf;
884     afs_wq_work_state_t next_state;
885     struct afs_work_queue_node_list * ql;
886     void * node_rock;
887     int detached = 0;
888
889     /* We can inc queue->running_count before actually pulling the node off
890      * of the ready_list, since running_count only really matters when we are
891      * shut down. If we get shut down before we pull the node off of
892      * ready_list, but after we inc'd running_count,
893      * _afs_wq_node_list_dequeue should return immediately with EINTR,
894      * in which case we'll dec running_count, so it's as if we never inc'd it
895      * in the first place. */
896     MUTEX_ENTER(&queue->lock);
897     if (queue->shutdown) {
898         MUTEX_EXIT(&queue->lock);
899         return EINTR;
900     }
901     queue->running_count++;
902     MUTEX_EXIT(&queue->lock);
903
904     ret = _afs_wq_node_list_dequeue(&queue->ready_list,
905                                          &node,
906                                          AFS_WQ_NODE_STATE_RUNNING,
907                                          block);
908     if (ret) {
909         _afs_wq_dec_running_count(queue);
910         goto error;
911     }
912
913     cbf = node->cbf;
914     node_rock = node->rock;
915     detached = node->detached;
916
917     if (cbf != NULL) {
918         MUTEX_EXIT(&node->lock);
919         code = (*cbf)(queue, node, queue->rock, node_rock, rock);
920         MUTEX_ENTER(&node->lock);
921         if (code == 0) {
922             next_state = AFS_WQ_NODE_STATE_DONE;
923             ql = &queue->done_list;
924         } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
925             if (node->error_count) {
926                 next_state = AFS_WQ_NODE_STATE_ERROR;
927                 ql = &queue->done_list;
928             } else if (node->block_count) {
929                 next_state = AFS_WQ_NODE_STATE_BLOCKED;
930                 ql = &queue->blocked_list;
931             } else {
932                 next_state = AFS_WQ_NODE_STATE_SCHEDULED;
933                 ql = &queue->ready_list;
934             }
935         } else {
936             next_state = AFS_WQ_NODE_STATE_ERROR;
937             ql = &queue->done_list;
938         }
939     } else {
940         next_state = AFS_WQ_NODE_STATE_DONE;
941         code = 0;
942         ql = &queue->done_list;
943     }
944
945     _afs_wq_dec_running_count(queue);
946
947     node->retcode = code;
948
949     if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
950         (next_state == AFS_WQ_NODE_STATE_ERROR)) {
951
952         MUTEX_ENTER(&queue->lock);
953
954         if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
955             /* signal other threads if we're about to below the low
956              * pending-tasks threshold */
957             queue->drain = 0;
958             CV_SIGNAL(&queue->pend_cv);
959         }
960
961         if (queue->pend_count == 1) {
962             /* signal other threads if we're about to become 'empty' */
963             CV_BROADCAST(&queue->empty_cv);
964         }
965
966         queue->pend_count--;
967
968         MUTEX_EXIT(&queue->lock);
969     }
970
971     ret = _afs_wq_node_state_wait_busy(node);
972     if (ret) {
973         goto error;
974     }
975
976     /* propagate scheduling changes down through dependencies */
977     ret = _afs_wq_dep_propagate(node, next_state);
978     if (ret) {
979         goto error;
980     }
981
982     ret = _afs_wq_node_state_wait_busy(node);
983     if (ret) {
984         goto error;
985     }
986
987     if (detached &&
988         ((next_state == AFS_WQ_NODE_STATE_DONE) ||
989          (next_state == AFS_WQ_NODE_STATE_ERROR))) {
990         _afs_wq_node_state_change(node, next_state);
991         _afs_wq_node_put_r(node, 1);
992     } else {
993         ret = _afs_wq_node_list_enqueue(ql,
994                                              node,
995                                              next_state);
996     }
997
998  error:
999     return ret;
1000 }
1001
1002 /**
1003  * initialize a struct afs_work_queue_opts to the default values
1004  *
1005  * @param[out] opts  opts struct to initialize
1006  */
1007 void
1008 afs_wq_opts_init(struct afs_work_queue_opts *opts)
1009 {
1010     opts->pend_lothresh = 0;
1011     opts->pend_hithresh = 0;
1012 }
1013
1014 /**
1015  * set the options for a struct afs_work_queue_opts appropriate for a certain
1016  * number of threads.
1017  *
1018  * @param[out] opts   opts struct in which to set the values
1019  * @param[in] threads number of threads
1020  */
1021 void
1022 afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
1023 {
1024     opts->pend_lothresh = threads * 2;
1025     opts->pend_hithresh = threads * 16;
1026
1027     /* safety */
1028     if (opts->pend_lothresh < 1) {
1029         opts->pend_lothresh = 1;
1030     }
1031     if (opts->pend_hithresh < 2) {
1032         opts->pend_hithresh = 2;
1033     }
1034 }
1035
1036 /**
1037  * allocate and initialize a work queue object.
1038  *
1039  * @param[out]   queue_out  address in which to store newly allocated work queue object
1040  * @param[in]    rock       work queue opaque pointer (passed as first arg to all fired callbacks)
1041  * @param[in]    opts       options for the new created queue
1042  *
1043  * @return operation status
1044  *    @retval 0 success
1045  *    @retval ENOMEM         out of memory
1046  */
1047 int
1048 afs_wq_create(struct afs_work_queue ** queue_out,
1049               void * rock,
1050               struct afs_work_queue_opts *opts)
1051 {
1052     int ret = 0;
1053     struct afs_work_queue * queue;
1054
1055     ret = _afs_wq_alloc(queue_out);
1056     if (ret) {
1057         goto error;
1058     }
1059     queue = *queue_out;
1060
1061     if (opts) {
1062         memcpy(&queue->opts, opts, sizeof(queue->opts));
1063     } else {
1064         afs_wq_opts_init(&queue->opts);
1065     }
1066
1067     _afs_wq_node_list_init(&queue->ready_list,
1068                                 AFS_WQ_NODE_LIST_READY);
1069     _afs_wq_node_list_init(&queue->blocked_list,
1070                                 AFS_WQ_NODE_LIST_BLOCKED);
1071     _afs_wq_node_list_init(&queue->done_list,
1072                                 AFS_WQ_NODE_LIST_DONE);
1073     queue->rock = rock;
1074     queue->drain = 0;
1075     queue->shutdown = 0;
1076     queue->pend_count = 0;
1077     queue->running_count = 0;
1078
1079     MUTEX_INIT(&queue->lock, "queue", MUTEX_DEFAULT, 0);
1080     CV_INIT(&queue->pend_cv, "queue pending", CV_DEFAULT, 0);
1081     CV_INIT(&queue->empty_cv, "queue empty", CV_DEFAULT, 0);
1082     CV_INIT(&queue->running_cv, "queue running", CV_DEFAULT, 0);
1083
1084  error:
1085     return ret;
1086 }
1087
1088 /**
1089  * deallocate and free a work queue object.
1090  *
1091  * @param[in] queue  work queue to be destroyed
1092  *
1093  * @return operation status
1094  *    @retval 0  success
1095  *    @retval AFS_WQ_ERROR unspecified error
1096  */
1097 int
1098 afs_wq_destroy(struct afs_work_queue * queue)
1099 {
1100     int ret = 0;
1101
1102     ret = _afs_wq_node_list_destroy(&queue->ready_list);
1103     if (ret) {
1104         goto error;
1105     }
1106
1107     ret = _afs_wq_node_list_destroy(&queue->blocked_list);
1108     if (ret) {
1109         goto error;
1110     }
1111
1112     ret = _afs_wq_node_list_destroy(&queue->done_list);
1113     if (ret) {
1114         goto error;
1115     }
1116
1117     ret = _afs_wq_free(queue);
1118
1119  error:
1120     return ret;
1121 }
1122
1123 /**
1124  * shutdown a work queue.
1125  *
1126  * @param[in] queue work queue object pointer
1127  *
1128  * @return operation status
1129  *    @retval 0 success
1130  */
1131 int
1132 afs_wq_shutdown(struct afs_work_queue * queue)
1133 {
1134     int ret = 0;
1135
1136     MUTEX_ENTER(&queue->lock);
1137     if (queue->shutdown) {
1138         /* already shutdown, do nothing */
1139         MUTEX_EXIT(&queue->lock);
1140         goto error;
1141     }
1142     queue->shutdown = 1;
1143
1144     ret = _afs_wq_node_list_shutdown(&queue->ready_list);
1145     if (ret) {
1146         goto error;
1147     }
1148
1149     ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
1150     if (ret) {
1151         goto error;
1152     }
1153
1154     ret = _afs_wq_node_list_shutdown(&queue->done_list);
1155     if (ret) {
1156         goto error;
1157     }
1158
1159     /* signal everyone that could be waiting, since these conditions will
1160      * generally fail to signal on their own if we're shutdown, since no
1161      * progress is being made */
1162     CV_BROADCAST(&queue->pend_cv);
1163     CV_BROADCAST(&queue->empty_cv);
1164     MUTEX_EXIT(&queue->lock);
1165
1166  error:
1167     return ret;
1168 }
1169
1170 /**
1171  * allocate a work node.
1172  *
1173  * @param[out] node_out  address in which to store new work node
1174  *
1175  * @return operation status
1176  *    @retval 0 success
1177  *    @retval ENOMEM         out of memory
1178  */
1179 int
1180 afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
1181 {
1182     int ret = 0;
1183     struct afs_work_queue_node * node;
1184
1185     *node_out = node = malloc(sizeof(*node));
1186     if (node == NULL) {
1187         ret = ENOMEM;
1188         goto error;
1189     }
1190
1191     queue_NodeInit(&node->node_list);
1192     node->qidx = AFS_WQ_NODE_LIST_NONE;
1193     node->cbf = NULL;
1194     node->rock = node->queue = NULL;
1195     node->refcount = 1;
1196     node->block_count = 0;
1197     node->error_count = 0;
1198     MUTEX_INIT(&node->lock, "node", MUTEX_DEFAULT, 0);
1199     CV_INIT(&node->state_cv, "node state", CV_DEFAULT, 0);
1200     node->state = AFS_WQ_NODE_STATE_INIT;
1201     queue_Init(&node->dep_children);
1202
1203  error:
1204     return ret;
1205 }
1206
1207 /**
1208  * free a work node.
1209  *
1210  * @param[in] node  work node object
1211  *
1212  * @return operation status
1213  *    @retval 0 success
1214  *
1215  * @internal
1216  */
1217 static int
1218 _afs_wq_node_free(struct afs_work_queue_node * node)
1219 {
1220     int ret = 0;
1221
1222     if (queue_IsOnQueue(node) ||
1223         (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
1224         (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
1225         (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
1226         ret = AFS_WQ_ERROR;
1227         goto error;
1228     }
1229
1230     ret = _afs_wq_node_free_deps(node);
1231     if (ret) {
1232         goto error;
1233     }
1234
1235     MUTEX_DESTROY(&node->lock);
1236     CV_DESTROY(&node->state_cv);
1237
1238     if (node->rock_dtor) {
1239         (*node->rock_dtor) (node->rock);
1240     }
1241
1242     free(node);
1243
1244  error:
1245     return ret;
1246 }
1247
1248 /**
1249  * get a reference to a work node.
1250  *
1251  * @param[in] node  work queue node
1252  *
1253  * @return operation status
1254  *    @retval 0 success
1255  */
1256 int
1257 afs_wq_node_get(struct afs_work_queue_node * node)
1258 {
1259     MUTEX_ENTER(&node->lock);
1260     node->refcount++;
1261     MUTEX_EXIT(&node->lock);
1262
1263     return 0;
1264 }
1265
1266 /**
1267  * put back a reference to a work node.
1268  *
1269  * @param[in] node  work queue node
1270  * @param[in] drop  drop node->lock
1271  *
1272  * @post if refcount reaches zero, node is deallocated.
1273  *
1274  * @return operation status
1275  *    @retval 0 success
1276  *
1277  * @pre node->lock held
1278  *
1279  * @internal
1280  */
1281 static int
1282 _afs_wq_node_put_r(struct afs_work_queue_node * node,
1283                    int drop)
1284 {
1285     afs_uint32 refc;
1286
1287     opr_Assert(node->refcount > 0);
1288     refc = --node->refcount;
1289     if (drop) {
1290         MUTEX_EXIT(&node->lock);
1291     }
1292     if (!refc) {
1293         opr_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
1294         _afs_wq_node_free(node);
1295     }
1296
1297     return 0;
1298 }
1299
1300 /**
1301  * put back a reference to a work node.
1302  *
1303  * @param[in] node  work queue node
1304  *
1305  * @post if refcount reaches zero, node is deallocated.
1306  *
1307  * @return operation status
1308  *    @retval 0 success
1309  */
1310 int
1311 afs_wq_node_put(struct afs_work_queue_node * node)
1312 {
1313     MUTEX_ENTER(&node->lock);
1314     return _afs_wq_node_put_r(node, 1);
1315 }
1316
1317 /**
1318  * set the callback function on a work node.
1319  *
1320  * @param[in] node  work queue node
1321  * @param[in] cbf   callback function
1322  * @param[in] rock  opaque pointer passed to callback
1323  * @param[in] rock_dtor  destructor function for 'rock', or NULL
1324  *
1325  * @return operation status
1326  *    @retval 0 success
1327  */
1328 int
1329 afs_wq_node_set_callback(struct afs_work_queue_node * node,
1330                          afs_wq_callback_func_t * cbf,
1331                          void * rock, afs_wq_callback_dtor_t *dtor)
1332 {
1333     MUTEX_ENTER(&node->lock);
1334     node->cbf = cbf;
1335     node->rock = rock;
1336     node->rock_dtor = dtor;
1337     MUTEX_EXIT(&node->lock);
1338
1339     return 0;
1340 }
1341
1342 /**
1343  * detach work node.
1344  *
1345  * @param[in] node  work queue node
1346  *
1347  * @return operation status
1348  *    @retval 0 success
1349  */
1350 int
1351 afs_wq_node_set_detached(struct afs_work_queue_node * node)
1352 {
1353     MUTEX_ENTER(&node->lock);
1354     node->detached = 1;
1355     MUTEX_EXIT(&node->lock);
1356
1357     return 0;
1358 }
1359
1360 /**
1361  * link a dependency node to a parent and child work node.
1362  *
1363  * This links a dependency node such that when the 'parent' work node is
1364  * done, the 'child' work node can proceed.
1365  *
1366  * @param[in]  dep      dependency node
1367  * @param[in]  parent   parent node in this dependency
1368  * @param[in]  child    child node in this dependency
1369  *
1370  * @return operation status
1371  *    @retval 0 success
1372  *
1373  * @pre
1374  *   - parent->lock held
1375  *   - child->lock held
1376  *   - parent and child in quiescent state
1377  *
1378  * @internal
1379  */
1380 static int
1381 _afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
1382                    struct afs_work_queue_node *parent,
1383                    struct afs_work_queue_node *child)
1384 {
1385     int ret = 0;
1386
1387     /* Each dep node adds a ref to the child node of that dep. We do not
1388      * do the same for the parent node, since if the only refs remaining
1389      * for a node are deps in node->dep_children, then the node should be
1390      * destroyed, and we will destroy the dep nodes when we free the
1391      * work node. */
1392     ret = _afs_wq_node_get_r(child);
1393     if (ret) {
1394         goto error;
1395     }
1396
1397     /* add this dep node to the parent node's list of deps */
1398     queue_Append(&parent->dep_children, &dep->parent_list);
1399
1400     dep->child = child;
1401     dep->parent = parent;
1402
1403  error:
1404     return ret;
1405 }
1406
1407 /**
1408  * add a dependency to a work node.
1409  *
1410  * @param[in] child  node which will be dependent upon completion of parent
1411  * @param[in] parent node whose completion gates child's execution
1412  *
1413  * @pre
1414  *   - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1415  *
1416  * @return operation status
1417  *    @retval 0 success
1418  */
1419 int
1420 afs_wq_node_dep_add(struct afs_work_queue_node * child,
1421                     struct afs_work_queue_node * parent)
1422 {
1423     int ret = 0;
1424     struct afs_work_queue_dep_node * dep = NULL;
1425     struct afs_work_queue_node_multilock ml;
1426     int held = 0;
1427
1428     /* self references are bad, mmkay? */
1429     if (parent == child) {
1430         ret = AFS_WQ_ERROR;
1431         goto error;
1432     }
1433
1434     ret = _afs_wq_dep_alloc(&dep);
1435     if (ret) {
1436         goto error;
1437     }
1438
1439     memset(&ml, 0, sizeof(ml));
1440     ml.nodes[0].node = parent;
1441     ml.nodes[1].node = child;
1442     ret = _afs_wq_node_multilock(&ml);
1443     if (ret) {
1444         goto error;
1445     }
1446     held = 1;
1447
1448     /* only allow dep modification while in initial state
1449      * or running state (e.g. do a dep add while inside callback) */
1450     if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1451         (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1452         ret = AFS_WQ_ERROR;
1453         goto error;
1454     }
1455
1456     /* link dep node with child and parent work queue node */
1457     ret = _afs_wq_dep_link_r(dep, parent, child);
1458     if (ret) {
1459         goto error;
1460     }
1461
1462     /* handle blocking counts */
1463     switch (parent->state) {
1464     case AFS_WQ_NODE_STATE_INIT:
1465     case AFS_WQ_NODE_STATE_SCHEDULED:
1466     case AFS_WQ_NODE_STATE_RUNNING:
1467     case AFS_WQ_NODE_STATE_BLOCKED:
1468         child->block_count++;
1469         break;
1470
1471     case AFS_WQ_NODE_STATE_ERROR:
1472         child->error_count++;
1473         break;
1474
1475     default:
1476         (void)0; /* nop */
1477     }
1478
1479  done:
1480     if (held) {
1481         MUTEX_EXIT(&child->lock);
1482         MUTEX_EXIT(&parent->lock);
1483     }
1484     return ret;
1485
1486  error:
1487     if (dep) {
1488         _afs_wq_dep_free(dep);
1489     }
1490     goto done;
1491 }
1492
1493 /**
1494  * remove a dependency from a work node.
1495  *
1496  * @param[in] child  node which was dependent upon completion of parent
1497  * @param[in] parent node whose completion gated child's execution
1498  *
1499  * @return operation status
1500  *    @retval 0 success
1501  */
1502 int
1503 afs_wq_node_dep_del(struct afs_work_queue_node * child,
1504                     struct afs_work_queue_node * parent)
1505 {
1506     int code, ret = 0;
1507     struct afs_work_queue_dep_node * dep, * ndep;
1508     struct afs_work_queue_node_multilock ml;
1509     int held = 0;
1510
1511     memset(&ml, 0, sizeof(ml));
1512     ml.nodes[0].node = parent;
1513     ml.nodes[1].node = child;
1514     code = _afs_wq_node_multilock(&ml);
1515     if (code) {
1516         goto error;
1517     }
1518     held = 1;
1519
1520     /* only permit changes while child is in init state
1521      * or running state (e.g. do a dep del when in callback func) */
1522     if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1523         (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1524         ret = AFS_WQ_ERROR;
1525         goto error;
1526     }
1527
1528     /* locate node linking parent and child */
1529     for (queue_Scan(&parent->dep_children,
1530                     dep,
1531                     ndep,
1532                     afs_work_queue_dep_node)) {
1533         if ((dep->child == child) &&
1534             (dep->parent == parent)) {
1535
1536             /* no need to grab an extra ref on dep->child here; the caller
1537              * should already have a ref on dep->child */
1538             code = _afs_wq_dep_unlink_r(dep);
1539             if (code) {
1540                 ret = code;
1541                 goto error;
1542             }
1543
1544             code = _afs_wq_dep_free(dep);
1545             if (code) {
1546                 ret = code;
1547                 goto error;
1548             }
1549             break;
1550         }
1551     }
1552
1553  error:
1554     if (held) {
1555         MUTEX_EXIT(&child->lock);
1556         MUTEX_EXIT(&parent->lock);
1557     }
1558     return ret;
1559 }
1560
1561 /**
1562  * block a work node from execution.
1563  *
1564  * this can be used to allow external events to influence work queue flow.
1565  *
1566  * @param[in] node  work queue node to be blocked
1567  *
1568  * @return operation status
1569  *    @retval 0 success
1570  *
1571  * @post external block count incremented
1572  */
1573 int
1574 afs_wq_node_block(struct afs_work_queue_node * node)
1575 {
1576     int ret = 0;
1577     int start;
1578
1579     MUTEX_ENTER(&node->lock);
1580     ret = _afs_wq_node_state_wait_busy(node);
1581     if (ret) {
1582         goto error_sync;
1583     }
1584
1585     start = node->block_count++;
1586
1587     if (!start &&
1588         (node->qidx == AFS_WQ_NODE_LIST_READY)) {
1589         /* unblocked->blocked transition, and we're already scheduled */
1590         ret = _afs_wq_node_list_remove(node,
1591                                             AFS_WQ_NODE_STATE_BUSY);
1592         if (ret) {
1593             goto error_sync;
1594         }
1595
1596         ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
1597                                              node,
1598                                              AFS_WQ_NODE_STATE_BLOCKED);
1599     }
1600
1601  error_sync:
1602     MUTEX_EXIT(&node->lock);
1603
1604     return ret;
1605 }
1606
1607 /**
1608  * unblock a work node for execution.
1609  *
1610  * this can be used to allow external events to influence work queue flow.
1611  *
1612  * @param[in] node  work queue node to be blocked
1613  *
1614  * @return operation status
1615  *    @retval 0 success
1616  *
1617  * @post external block count decremented
1618  */
1619 int
1620 afs_wq_node_unblock(struct afs_work_queue_node * node)
1621 {
1622     int ret = 0;
1623     int end;
1624
1625     MUTEX_ENTER(&node->lock);
1626     ret = _afs_wq_node_state_wait_busy(node);
1627     if (ret) {
1628         goto error_sync;
1629     }
1630
1631     end = --node->block_count;
1632
1633     if (!end &&
1634         (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
1635         /* blocked->unblock transition, and we're ready to be scheduled */
1636         ret = _afs_wq_node_list_remove(node,
1637                                             AFS_WQ_NODE_STATE_BUSY);
1638         if (ret) {
1639             goto error_sync;
1640         }
1641
1642         ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
1643                                              node,
1644                                              AFS_WQ_NODE_STATE_SCHEDULED);
1645     }
1646
1647  error_sync:
1648     MUTEX_EXIT(&node->lock);
1649
1650     return ret;
1651 }
1652
1653 /**
1654  * initialize a afs_wq_add_opts struct with the default options.
1655  *
1656  * @param[out] opts  options structure to initialize
1657  */
1658 void
1659 afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
1660 {
1661     opts->donate = 0;
1662     opts->block = 1;
1663     opts->force = 0;
1664 }
1665
1666 /**
1667  * schedule a work node for execution.
1668  *
1669  * @param[in] queue  work queue
1670  * @param[in] node   work node
1671  * @param[in] opts   options for adding, or NULL for defaults
1672  *
1673  * @return operation status
1674  *    @retval 0 success
1675  *    @retval EWOULDBLOCK queue is full and opts specified not to block
1676  *    @retval EINTR queue was full, we blocked to add, and the queue was
1677  *                  shutdown while we were blocking
1678  */
1679 int
1680 afs_wq_add(struct afs_work_queue *queue,
1681            struct afs_work_queue_node *node,
1682            struct afs_work_queue_add_opts *opts)
1683 {
1684     int ret = 0;
1685     int donate, block, force, hithresh;
1686     struct afs_work_queue_node_list * list;
1687     struct afs_work_queue_add_opts l_opts;
1688     int waited_for_drain = 0;
1689     afs_wq_work_state_t state;
1690
1691     if (!opts) {
1692         afs_wq_add_opts_init(&l_opts);
1693         opts = &l_opts;
1694     }
1695
1696     donate = opts->donate;
1697     block = opts->block;
1698     force = opts->force;
1699
1700  retry:
1701     MUTEX_ENTER(&node->lock);
1702
1703     ret = _afs_wq_node_state_wait_busy(node);
1704     if (ret) {
1705         goto error;
1706     }
1707
1708     if (!node->block_count && !node->error_count) {
1709         list = &queue->ready_list;
1710         state = AFS_WQ_NODE_STATE_SCHEDULED;
1711     } else if (node->error_count) {
1712         list = &queue->done_list;
1713         state = AFS_WQ_NODE_STATE_ERROR;
1714     } else {
1715         list = &queue->blocked_list;
1716         state = AFS_WQ_NODE_STATE_BLOCKED;
1717     }
1718
1719     ret = 0;
1720
1721     MUTEX_ENTER(&queue->lock);
1722
1723     if (queue->shutdown) {
1724         ret = EINTR;
1725         MUTEX_EXIT(&queue->lock);
1726         MUTEX_EXIT(&node->lock);
1727         goto error;
1728     }
1729
1730     hithresh = queue->opts.pend_hithresh;
1731     if (hithresh > 0 && queue->pend_count >= hithresh) {
1732         queue->drain = 1;
1733     }
1734
1735     if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
1736                    || state == AFS_WQ_NODE_STATE_BLOCKED)) {
1737
1738         if (queue->drain) {
1739             if (block) {
1740                 MUTEX_EXIT(&node->lock);
1741                 CV_WAIT(&queue->pend_cv, &queue->lock);
1742
1743                 if (queue->shutdown) {
1744                     ret = EINTR;
1745                 } else {
1746                     MUTEX_EXIT(&queue->lock);
1747
1748                     waited_for_drain = 1;
1749
1750                     goto retry;
1751                 }
1752             } else {
1753                 ret = EWOULDBLOCK;
1754             }
1755         }
1756     }
1757
1758     if (ret == 0) {
1759         queue->pend_count++;
1760     }
1761     if (waited_for_drain) {
1762         /* signal another thread that may have been waiting for drain */
1763         CV_SIGNAL(&queue->pend_cv);
1764     }
1765
1766     MUTEX_EXIT(&queue->lock);
1767
1768     if (ret) {
1769         goto error;
1770     }
1771
1772     if (!donate)
1773         node->refcount++;
1774     node->queue = queue;
1775
1776     ret = _afs_wq_node_list_enqueue(list,
1777                                          node,
1778                                          state);
1779  error:
1780     return ret;
1781 }
1782
1783 /**
1784  * de-schedule a work node.
1785  *
1786  * @param[in] node  work node
1787  *
1788  * @return operation status
1789  *    @retval 0 success
1790  */
1791 int
1792 afs_wq_del(struct afs_work_queue_node * node)
1793 {
1794     /* XXX todo */
1795     return ENOTSUP;
1796 }
1797
1798 /**
1799  * execute a node on the queue.
1800  *
1801  * @param[in] queue  work queue
1802  * @param[in] rock   opaque pointer (passed as third arg to callback func)
1803  *
1804  * @return operation status
1805  *    @retval 0 completed a work unit
1806  */
1807 int
1808 afs_wq_do(struct afs_work_queue * queue,
1809           void * rock)
1810 {
1811     return _afs_wq_do(queue, rock, 1);
1812 }
1813
1814 /**
1815  * execute a node on the queue, if there is any work to do.
1816  *
1817  * @param[in] queue  work queue
1818  * @param[in] rock   opaque pointer (passed as third arg to callback func)
1819  *
1820  * @return operation status
1821  *    @retval 0 completed a work unit
1822  *    @retval EWOULDBLOCK there was nothing to do
1823  */
1824 int
1825 afs_wq_do_nowait(struct afs_work_queue * queue,
1826                  void * rock)
1827 {
1828     return _afs_wq_do(queue, rock, 0);
1829 }
1830
1831 /**
1832  * wait for all pending nodes to finish.
1833  *
1834  * @param[in] queue  work queue
1835  *
1836  * @return operation status
1837  *   @retval 0 success
1838  *
1839  * @post the specified queue was empty at some point; it may not be empty by
1840  * the time this function returns, but at some point after the function was
1841  * called, there were no nodes in the ready queue or blocked queue.
1842  */
1843 int
1844 afs_wq_wait_all(struct afs_work_queue *queue)
1845 {
1846     int ret = 0;
1847
1848     MUTEX_ENTER(&queue->lock);
1849
1850     while (queue->pend_count > 0 && !queue->shutdown) {
1851         CV_WAIT(&queue->empty_cv, &queue->lock);
1852     }
1853
1854     if (queue->shutdown) {
1855         /* queue has been shut down, but there may still be some threads
1856          * running e.g. in the middle of their callback. ensure they have
1857          * stopped before we return. */
1858         while (queue->running_count > 0) {
1859             CV_WAIT(&queue->running_cv, &queue->lock);
1860         }
1861         ret = EINTR;
1862         goto done;
1863     }
1864
1865  done:
1866     MUTEX_EXIT(&queue->lock);
1867
1868     /* technically this doesn't really guarantee that the work queue is empty
1869      * after we return, but we do guarantee that it was empty at some point */
1870
1871     return ret;
1872 }
1873
1874 /**
1875  * wait for a node to complete; dequeue from done list.
1876  *
1877  * @param[in]  node     work queue node
1878  * @param[out] retcode  return code from work unit
1879  *
1880  * @return operation status
1881  *    @retval 0 sucess
1882  *
1883  * @pre ref held on node
1884  */
1885 int
1886 afs_wq_node_wait(struct afs_work_queue_node * node,
1887                  int * retcode)
1888 {
1889     int ret = 0;
1890
1891     MUTEX_ENTER(&node->lock);
1892     if (node->state == AFS_WQ_NODE_STATE_INIT) {
1893         /* not sure what to do in this case */
1894         goto done_sync;
1895     }
1896
1897     while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
1898            (node->state != AFS_WQ_NODE_STATE_ERROR)) {
1899         CV_WAIT(&node->state_cv, &node->lock);
1900     }
1901     if (retcode) {
1902         *retcode = node->retcode;
1903     }
1904
1905     if (node->queue == NULL) {
1906         /* nothing we can do */
1907         goto done_sync;
1908     }
1909
1910     ret = _afs_wq_node_list_remove(node,
1911                                         AFS_WQ_NODE_STATE_INIT);
1912
1913  done_sync:
1914     MUTEX_EXIT(&node->lock);
1915
1916     return ret;
1917 }