afsmonitor: avoid double free on exit
[openafs.git] / src / mcas / rb_lock_concurrentwriters.c
1 /******************************************************************************
2  * rb_lock_concurrentwriters.c
3  *
4  * Lock-based red-black trees, based on Hanke's relaxed balancing operations.
5  *
6  * For more details on the local tree restructuring operations used here:
7  *  S. Hanke, T. Ottmann, and E. Soisalon-Soininen.
8  *  "Relaxed balanced red-black trees".
9  *  3rd Italian Conference on Algorithms and Complexity, pages 193-204.
10  *
11  * Rather than issuing up-in and up-out requests to a balancing process,
12  * each operation is directly responsible for local rebalancing. However,
13  * this process can be split into a number of individual restructuring
14  * operations, and locks can be released between each operation. Between
15  * operations, we mark the node concerned as UNBALANCED -- contending
16  * updates will then wait for this mark to be removed before continuing.
17  *
18  * Copyright (c) 2002-2003, K A Fraser
19
20 Redistribution and use in source and binary forms, with or without
21 modification, are permitted provided that the following conditions are
22 met:
23
24     * Redistributions of source code must retain the above copyright
25     * notice, this list of conditions and the following disclaimer.
26     * Redistributions in binary form must reproduce the above
27     * copyright notice, this list of conditions and the following
28     * disclaimer in the documentation and/or other materials provided
29     * with the distribution.  Neither the name of the Keir Fraser
30     * nor the names of its contributors may be used to endorse or
31     * promote products derived from this software without specific
32     * prior written permission.
33
34 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
35 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
36 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
37 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
38 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
39 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
40 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
41 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
42 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
43 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
44 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46
47 #define __SET_IMPLEMENTATION__
48
49 #include <stdio.h>
50 #include <assert.h>
51 #include <stdlib.h>
52 #include <unistd.h>
53 #include "portable_defns.h"
54 #include "gc.h"
55 #include "set.h"
56
57 #define BLACK_MARK      0
58 #define RED_MARK        1
59 #define UNBALANCED_MARK 2
60
61 #define SET_VALUE(_v,_n)  \
62     ((_v) = ((setval_t)(((unsigned long)(_v)&3)|((unsigned long)(_n)))))
63 #define GET_VALUE(_v)     ((setval_t)((int_addr_t)(_v) & ~3UL))
64 #define GET_COLOUR(_v)    ((int_addr_t)(_v) & 1)
65 #define SET_COLOUR(_v,_c) \
66     ((setval_t)(((unsigned long)(_v)&~1UL)|(unsigned long)(_c)))
67
68 #define IS_BLACK(_v)      (GET_COLOUR(_v) == 0)
69 #define IS_RED(_v)        (GET_COLOUR(_v) == 1)
70 #define IS_UNBALANCED(_v) (((int_addr_t)(_v) & 2) == 2)
71
72 #define MK_BLACK(_v)      ((setval_t)(((int_addr_t)(_v)&~1UL) | 0))
73 #define MK_RED(_v)        ((setval_t)(((int_addr_t)(_v)&~1UL) | 1))
74 #define MK_BALANCED(_v)   ((setval_t)(((int_addr_t)(_v)&~2UL) | 0))
75 #define MK_UNBALANCED(_v) ((setval_t)(((int_addr_t)(_v)&~2UL) | 2))
76
77 #define GARBAGE_VALUE     ((setval_t)4)
78 #define IS_GARBAGE(_n)    (GET_VALUE((_n)->v) == GARBAGE_VALUE)
79 #define MK_GARBAGE(_n)    (SET_VALUE((_n)->v, GARBAGE_VALUE))
80
81 #define INTERNAL_VALUE ((void *)0xdeadbee0)
82
83 #define IS_ROOT(_n) ((_n)->p->k == 0)
84 #define IS_LEAF(_n) ((_n)->l == NULL)
85
86 /* TRUE if node X is a child of P. */
87 #define ADJACENT(_p,_x) (((_p)->l==(_x))||((_p)->r==(_x)))
88
89 typedef struct node_st node_t;
90 typedef struct set_st set_t;
91
92 struct node_st
93 {
94     setkey_t k;
95     setval_t v;
96     node_t *l, *r, *p;
97     mrsw_lock_t lock;
98 };
99
100 struct set_st
101 {
102     node_t root;
103     node_t null;
104     node_t dummy_g, dummy_gg;
105 };
106
107 static int gc_id;
108
109 /* Nodes p, x, y must be locked for writing. */
110 static void left_rotate(node_t *x)
111 {
112     node_t *y = x->r, *p = x->p;
113     x->r    = y->l;
114     x->r->p = x;
115     x->p    = y;
116     y->l    = x;
117     y->p    = p;
118     if ( x == p->l ) p->l = y; else p->r = y;
119 }
120
121
122 /* Nodes p, x, y must be locked for writing. */
123 static void right_rotate(node_t *x)
124 {
125     node_t *y = x->l, *p = x->p;
126     x->l    = y->r;
127     x->l->p = x;
128     x->p    = y;
129     y->r    = x;
130     y->p    = p;
131     if ( x == p->l ) p->l = y; else p->r = y;
132 }
133
134
135 static void fix_unbalance_up(node_t *x)
136 {
137     mrsw_qnode_t x_qn, g_qn, p_qn, w_qn, gg_qn;
138     node_t *g, *p, *w, *gg;
139     int done = 0;
140
141     do {
142         assert(IS_UNBALANCED(x->v));
143         if ( IS_GARBAGE(x) ) return;
144
145         p  = x->p;
146         g  = p->p;
147         gg = g->p;
148
149         wr_lock(&gg->lock, &gg_qn);
150         if ( !ADJACENT(gg, g) || IS_UNBALANCED(gg->v) || IS_GARBAGE(gg) )
151             goto unlock_gg;
152
153         wr_lock(&g->lock, &g_qn);
154         if ( !ADJACENT(g, p) || IS_UNBALANCED(g->v) ) goto unlock_ggg;
155
156         wr_lock(&p->lock, &p_qn);
157         if ( !ADJACENT(p, x) || IS_UNBALANCED(p->v) ) goto unlock_pggg;
158
159         wr_lock(&x->lock, &x_qn);
160
161         assert(IS_RED(x->v));
162         assert(IS_UNBALANCED(x->v));
163
164         if ( IS_BLACK(p->v) )
165         {
166             /* Case 1. Nothing to do. */
167             x->v = MK_BALANCED(x->v);
168             done = 1;
169             goto unlock_xpggg;
170         }
171
172         if ( IS_ROOT(x) )
173         {
174             /* Case 2. */
175             x->v = MK_BLACK(MK_BALANCED(x->v));
176             done = 1;
177             goto unlock_xpggg;
178         }
179
180         if ( IS_ROOT(p) )
181         {
182             /* Case 2. */
183             p->v = MK_BLACK(p->v);
184             x->v = MK_BALANCED(x->v);
185             done = 1;
186             goto unlock_xpggg;
187         }
188
189         if ( g->l == p ) w = g->r; else w = g->l;
190         wr_lock(&w->lock, &w_qn);
191
192         if ( IS_RED(w->v) )
193         {
194             /* Case 5. */
195             /* In all other cases, doesn't change colour or subtrees. */
196             if ( IS_UNBALANCED(w->v) ) goto unlock_wxpggg;
197             g->v = MK_UNBALANCED(MK_RED(g->v));
198             p->v = MK_BLACK(p->v);
199             w->v = MK_BLACK(w->v);
200             x->v = MK_BALANCED(x->v);
201             done = 2;
202             goto unlock_wxpggg;
203         }
204
205         /* Cases 3 & 4. Both of these need the great-grandfather locked. */
206         if ( p == g->l )
207         {
208             if ( x == p->l )
209             {
210                 /* Case 3. Single rotation. */
211                 x->v = MK_BALANCED(x->v);
212                 p->v = MK_BLACK(p->v);
213                 g->v = MK_RED(g->v);
214                 right_rotate(g);
215             }
216             else
217             {
218                 /* Case 4. Double rotation. */
219                 x->v = MK_BALANCED(MK_BLACK(x->v));
220                 g->v = MK_RED(g->v);
221                 left_rotate(p);
222                 right_rotate(g);
223             }
224         }
225         else /* SYMMETRIC CASE */
226         {
227             if ( x == p->r )
228             {
229                 /* Case 3. Single rotation. */
230                 x->v = MK_BALANCED(x->v);
231                 p->v = MK_BLACK(p->v);
232                 g->v = MK_RED(g->v);
233                 left_rotate(g);
234             }
235             else
236             {
237                 /* Case 4. Double rotation. */
238                 x->v = MK_BALANCED(MK_BLACK(x->v));
239                 g->v = MK_RED(g->v);
240                 right_rotate(p);
241                 left_rotate(g);
242             }
243         }
244
245         done = 1;
246
247     unlock_wxpggg:
248         wr_unlock(&w->lock, &w_qn);
249     unlock_xpggg:
250         wr_unlock(&x->lock, &x_qn);
251     unlock_pggg:
252         wr_unlock(&p->lock, &p_qn);
253     unlock_ggg:
254         wr_unlock(&g->lock, &g_qn);
255     unlock_gg:
256         wr_unlock(&gg->lock, &gg_qn);
257
258         if ( done == 2 )
259         {
260             x = g;
261             done = 0;
262         }
263     }
264     while ( !done );
265 }
266
267
268 static void fix_unbalance_down(node_t *x)
269 {
270     /* WN == W_NEAR, WF == W_FAR (W_FAR is further, in key space, from X). */
271     mrsw_qnode_t x_qn, w_qn, p_qn, g_qn, wn_qn, wf_qn;
272     node_t *w, *p, *g, *wn, *wf;
273     int done = 0;
274
275     do {
276         if ( !IS_UNBALANCED(x->v) || IS_GARBAGE(x) ) return;
277
278         p = x->p;
279         g = p->p;
280
281         wr_lock(&g->lock, &g_qn);
282         if ( !ADJACENT(g, p) || IS_UNBALANCED(g->v) || IS_GARBAGE(g) )
283             goto unlock_g;
284
285         wr_lock(&p->lock, &p_qn);
286         if ( !ADJACENT(p, x) || IS_UNBALANCED(p->v) ) goto unlock_pg;
287
288         wr_lock(&x->lock, &x_qn);
289
290         if ( !IS_BLACK(x->v) || !IS_UNBALANCED(x->v) )
291         {
292             done = 1;
293             goto unlock_xpg;
294         }
295
296         if ( IS_ROOT(x) )
297         {
298             x->v = MK_BALANCED(x->v);
299             done = 1;
300             goto unlock_xpg;
301         }
302
303         w = (x == p->l) ? p->r : p->l;
304         wr_lock(&w->lock, &w_qn);
305         if ( IS_UNBALANCED(w->v) )
306         {
307             if ( IS_BLACK(w->v) )
308             {
309                 /* Funky relaxed rules to the rescue. */
310                 x->v = MK_BALANCED(x->v);
311                 w->v = MK_BALANCED(w->v);
312                 if ( IS_BLACK(p->v) )
313                 {
314                     p->v = MK_UNBALANCED(p->v);
315                     done = 2;
316                 }
317                 else
318                 {
319                     p->v = MK_BLACK(p->v);
320                     done = 1;
321                 }
322             }
323             goto unlock_wxpg;
324         }
325
326         assert(!IS_LEAF(w));
327
328         if ( x == p->l )
329         {
330             wn = w->l;
331             wf = w->r;
332         }
333         else
334         {
335             wn = w->r;
336             wf = w->l;
337         }
338
339         wr_lock(&wn->lock, &wn_qn);
340         /* Hanke has an extra relaxed transform here. It's not needed. */
341         if ( IS_UNBALANCED(wn->v) ) goto unlock_wnwxpg;
342
343         wr_lock(&wf->lock, &wf_qn);
344         if ( IS_UNBALANCED(wf->v) ) goto unlock_wfwnwxpg;
345
346         if ( IS_RED(w->v) )
347         {
348             /* Case 1. Rotate at parent. */
349             assert(IS_BLACK(p->v) && IS_BLACK(wn->v) && IS_BLACK(wf->v));
350             w->v = MK_BLACK(w->v);
351             p->v = MK_RED(p->v);
352             if ( x == p->l ) left_rotate(p); else right_rotate(p);
353             goto unlock_wfwnwxpg;
354         }
355
356         if ( IS_BLACK(wn->v) && IS_BLACK(wf->v) )
357         {
358             if ( IS_RED(p->v) )
359             {
360                 /* Case 2. Simple recolouring. */
361                 p->v = MK_BLACK(p->v);
362                 done = 1;
363             }
364             else
365             {
366                 /* Case 5. Simple recolouring. */
367                 p->v = MK_UNBALANCED(p->v);
368                 done = 2;
369             }
370             w->v = MK_RED(w->v);
371             x->v = MK_BALANCED(x->v);
372             goto unlock_wfwnwxpg;
373         }
374
375         if ( x == p->l )
376         {
377             if ( IS_RED(wf->v) )
378             {
379                 /* Case 3. Single rotation. */
380                 wf->v = MK_BLACK(wf->v);
381                 w->v = SET_COLOUR(w->v, GET_COLOUR(p->v));
382                 p->v = MK_BLACK(p->v);
383                 x->v = MK_BALANCED(x->v);
384                 left_rotate(p);
385             }
386             else
387             {
388                 /* Case 4. Double rotation. */
389                 assert(IS_RED(wn->v));
390                 wn->v = SET_COLOUR(wn->v, GET_COLOUR(p->v));
391                 p->v = MK_BLACK(p->v);
392                 x->v = MK_BALANCED(x->v);
393                 right_rotate(w);
394                 left_rotate(p);
395             }
396         }
397         else /* SYMMETRIC CASE: X == P->R  */
398         {
399             if ( IS_RED(wf->v) )
400             {
401                 /* Case 3. Single rotation. */
402                 wf->v = MK_BLACK(wf->v);
403                 w->v = SET_COLOUR(w->v, GET_COLOUR(p->v));
404                 p->v = MK_BLACK(p->v);
405                 x->v = MK_BALANCED(x->v);
406                 right_rotate(p);
407             }
408             else
409             {
410                 /* Case 4. Double rotation. */
411                 assert(IS_RED(wn->v));
412                 wn->v = SET_COLOUR(wn->v, GET_COLOUR(p->v));
413                 p->v = MK_BLACK(p->v);
414                 x->v = MK_BALANCED(x->v);
415                 left_rotate(w);
416                 right_rotate(p);
417             }
418         }
419
420         done = 1;
421
422     unlock_wfwnwxpg:
423         wr_unlock(&wf->lock, &wf_qn);
424     unlock_wnwxpg:
425         wr_unlock(&wn->lock, &wn_qn);
426     unlock_wxpg:
427         wr_unlock(&w->lock, &w_qn);
428     unlock_xpg:
429         wr_unlock(&x->lock, &x_qn);
430     unlock_pg:
431         wr_unlock(&p->lock, &p_qn);
432     unlock_g:
433         wr_unlock(&g->lock, &g_qn);
434
435         if ( done == 2 )
436         {
437             x = p;
438             done = 0;
439         }
440     }
441     while ( !done );
442 }
443
444
445 static void delete_finish(ptst_t *ptst, node_t *x)
446 {
447     mrsw_qnode_t g_qn, p_qn, w_qn, x_qn;
448     node_t *g, *p, *w;
449     int done = 0;
450
451     do {
452         if ( IS_GARBAGE(x) ) return;
453
454         p = x->p;
455         g = p->p;
456
457         wr_lock(&g->lock, &g_qn);
458         if ( !ADJACENT(g, p) || IS_UNBALANCED(g->v) || IS_GARBAGE(g) )
459             goto unlock_g;
460
461         wr_lock(&p->lock, &p_qn);
462         /* Removing unbalanced red nodes is okay. */
463         if ( !ADJACENT(p, x) || (IS_UNBALANCED(p->v) && IS_BLACK(p->v)) )
464             goto unlock_pg;
465
466         wr_lock(&x->lock, &x_qn);
467         if ( IS_UNBALANCED(x->v) ) goto unlock_xpg;
468         if ( GET_VALUE(x->v) != NULL )
469         {
470             done = 1;
471             goto unlock_xpg;
472         }
473
474         if ( p->l == x ) w = p->r; else w = p->l;
475         assert(w != x);
476         wr_lock(&w->lock, &w_qn);
477         if ( IS_UNBALANCED(w->v) ) goto unlock_wxpg;
478
479         if ( g->l == p ) g->l = w; else g->r = w;
480         MK_GARBAGE(p); gc_free(ptst, p, gc_id);
481         MK_GARBAGE(x); gc_free(ptst, x, gc_id);
482         w->p = g;
483         if ( IS_BLACK(p->v) && IS_BLACK(w->v) )
484         {
485             w->v = MK_UNBALANCED(w->v);
486             done = 2;
487         }
488         else
489         {
490             w->v = MK_BLACK(w->v);
491             done = 1;
492         }
493
494     unlock_wxpg:
495         wr_unlock(&w->lock, &w_qn);
496     unlock_xpg:
497         wr_unlock(&x->lock, &x_qn);
498     unlock_pg:
499         wr_unlock(&p->lock, &p_qn);
500     unlock_g:
501         wr_unlock(&g->lock, &g_qn);
502     }
503     while ( !done );
504
505     if ( done == 2 ) fix_unbalance_down(w);
506 }
507
508
509 set_t *set_alloc(void)
510 {
511     ptst_t *ptst;
512     set_t  *set;
513     node_t *root, *null;
514
515     ptst = critical_enter();
516
517     set = (set_t *)malloc(sizeof(*set));
518     memset(set, 0, sizeof(*set));
519
520     root = &set->root;
521     null = &set->null;
522
523     root->k = 0;
524     root->v = MK_RED(INTERNAL_VALUE);
525     root->l = NULL;
526     root->r = null;
527     root->p = NULL;
528     mrsw_init(&root->lock);
529
530     null->k = SENTINEL_KEYMIN;
531     null->v = MK_BLACK(INTERNAL_VALUE);
532     null->l = NULL;
533     null->r = NULL;
534     null->p = root;
535     mrsw_init(&null->lock);
536
537     set->dummy_gg.l = &set->dummy_g;
538     set->dummy_g.p  = &set->dummy_gg;
539     set->dummy_g.l  = &set->root;
540     set->root.p     = &set->dummy_g;
541
542     critical_exit(ptst);
543
544     return set;
545 }
546
547
548 setval_t set_update(set_t *s, setkey_t k, setval_t v, int overwrite)
549 {
550     ptst_t  *ptst;
551     node_t  *x, *y, *z, *new_internal, *new_leaf;
552     mrsw_qnode_t qn[2], *y_pqn=qn+0, *z_pqn=qn+1, *t_pqn, x_qn;
553     int fix_up = 0;
554     setval_t ov = NULL;
555
556     k = CALLER_TO_INTERNAL_KEY(k);
557
558     ptst = critical_enter();
559
560     /*
561      * We start our search by read-lock-coupling from the root.
562      * There is a special case, when there is only one node in the tree.
563      * In this case, we take a write lock on the root.
564      */
565  retry_from_root:
566     z = &s->root;
567     rd_lock(&z->lock, z_pqn);
568
569     /*
570      * We read-couple down the tree until we get within two nodes of the
571      * required leaf. We then speculatively take write locks.
572      */
573  carry_on:
574     while ( (y = (k <= z->k) ? z->l : z->r) != NULL )
575     {
576         if ( IS_LEAF(y) )
577         {
578             y = z;
579             rd_unlock(&z->lock, z_pqn);
580             wr_lock(&y->lock, y_pqn);
581             x = (k <= z->k) ? z->l : z->r;
582             if ( IS_GARBAGE(y) || !IS_LEAF(x) )
583             {
584                 wr_unlock(&y->lock, y_pqn);
585                 goto retry_from_root;
586             }
587             wr_lock(&x->lock, &x_qn);
588             assert(!IS_GARBAGE(x));
589             goto found_and_locked;
590         }
591
592         x = (k <= y->k) ? y->l : y->r;
593         if ( IS_LEAF(x) ) goto found;
594         rd_lock(&y->lock, y_pqn);
595         rd_unlock(&z->lock, z_pqn);
596         z = y;
597         t_pqn = y_pqn;
598         y_pqn = z_pqn;
599         z_pqn = t_pqn;
600     }
601
602     /*
603      * At this point Z is read locked, and next two nodes on search path
604      * are probably the last. Certainly there is more than one on the path.
605      */
606  found:
607     wr_lock(&y->lock, y_pqn);
608     x = (k <= y->k) ? y->l : y->r;
609     if ( !IS_LEAF(x) )
610     {
611         wr_unlock(&y->lock, y_pqn);
612         goto carry_on;
613     }
614     wr_lock(&x->lock, &x_qn);
615     rd_unlock(&z->lock, z_pqn);
616
617  found_and_locked:
618     /*
619      * At this point, node X is write locked and may be correct node.
620      * Y is X's parent, and is also write locked. No other node is locked.
621      */
622     assert(!IS_GARBAGE(x));
623     if ( x->k == k )
624     {
625         ov = GET_VALUE(x->v);
626         if ( overwrite || (ov == NULL) )
627         {
628             SET_VALUE(x->v, v);
629         }
630     }
631     else
632     {
633         new_leaf     = gc_alloc(ptst, gc_id);
634         new_internal = gc_alloc(ptst, gc_id);
635         new_leaf->k = k;
636         new_leaf->v = MK_BLACK(v);
637         new_leaf->l = NULL;
638         new_leaf->r = NULL;
639         new_leaf->p = new_internal;
640         mrsw_init(&new_leaf->lock);
641         if ( x->k < k )
642         {
643             new_internal->k = x->k;
644             new_internal->l = x;
645             new_internal->r = new_leaf;
646         }
647         else
648         {
649             new_internal->k = k;
650             new_internal->l = new_leaf;
651             new_internal->r = x;
652         }
653         new_internal->p = y;
654         mrsw_init(&new_internal->lock);
655         x->p = new_internal;
656         if ( y->l == x ) y->l = new_internal; else y->r = new_internal;
657         if ( IS_UNBALANCED(x->v) )
658         {
659             x->v = MK_BALANCED(x->v);
660             new_internal->v = MK_BLACK(INTERNAL_VALUE);
661         }
662         else if ( IS_RED(y->v) )
663         {
664             new_internal->v = MK_UNBALANCED(MK_RED(INTERNAL_VALUE));
665             fix_up = 1;
666         }
667         else
668         {
669             new_internal->v = MK_RED(INTERNAL_VALUE);
670         }
671     }
672
673     wr_unlock(&y->lock, y_pqn);
674     wr_unlock(&x->lock, &x_qn);
675
676     if ( fix_up ) fix_unbalance_up(new_internal);
677
678     critical_exit(ptst);
679
680     return ov;
681 }
682
683
684 setval_t set_remove(set_t *s, setkey_t k)
685 {
686     ptst_t  *ptst;
687     node_t  *y, *z;
688     mrsw_qnode_t qn[2], *y_pqn=qn+0, *z_pqn=qn+1, *t_pqn;
689     setval_t ov = NULL;
690
691     k = CALLER_TO_INTERNAL_KEY(k);
692
693     ptst = critical_enter();
694
695     z = &s->root;
696     rd_lock(&z->lock, z_pqn);
697
698     while ( (y = (k <= z->k) ? z->l : z->r) != NULL )
699     {
700         if ( IS_LEAF(y) )
701             wr_lock(&y->lock, y_pqn);
702         else
703             rd_lock(&y->lock, y_pqn);
704         rd_unlock(&z->lock, z_pqn);
705         z = y;
706         t_pqn = y_pqn;
707         y_pqn = z_pqn;
708         z_pqn = t_pqn;
709     }
710
711     if ( z->k == k )
712     {
713         ov = GET_VALUE(z->v);
714         SET_VALUE(z->v, NULL);
715     }
716
717     wr_unlock(&z->lock, z_pqn);
718
719     if ( ov != NULL ) delete_finish(ptst, z);
720
721     critical_exit(ptst);
722     return ov;
723 }
724
725
726 setval_t set_lookup(set_t *s, setkey_t k)
727 {
728     ptst_t  *ptst;
729     node_t  *m, *n;
730     mrsw_qnode_t qn[2], *m_pqn=&qn[0], *n_pqn=&qn[1], *t_pqn;
731     setval_t v = NULL;
732
733     k = CALLER_TO_INTERNAL_KEY(k);
734
735     ptst = critical_enter();
736
737     n = &s->root;
738     rd_lock(&n->lock, n_pqn);
739
740     while ( (m = (k <= n->k) ? n->l : n->r) != NULL )
741     {
742         rd_lock(&m->lock, m_pqn);
743         rd_unlock(&n->lock, n_pqn);
744         n = m;
745         t_pqn = m_pqn;
746         m_pqn = n_pqn;
747         n_pqn = t_pqn;
748     }
749
750     if ( k == n->k ) v = GET_VALUE(n->v);
751
752     rd_unlock(&n->lock, n_pqn);
753
754     critical_exit(ptst);
755
756     return v;
757 }
758
759
760 void _init_set_subsystem(void)
761 {
762     gc_id = gc_add_allocator(sizeof(node_t));
763 }