reindent-20030715
[openafs.git] / src / ubik / ubikclient.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #if defined(UKERNEL)
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #if defined(UKERNEL)
21 #include "afs/sysincludes.h"
22 #include "afsincludes.h"
23 #include "afs/stds.h"
24 #include "rx/xdr.h"
25 #include "rx/rx.h"
26 #include "afs/lock.h"
27 #include "afs/rxgen_consts.h"
28 #include "ubik.h"
29 #include "afs/pthread_glock.h"
30 #else /* defined(UKERNEL) */
31 #include <afs/stds.h>
32 #include <afs/pthread_glock.h>
33 #include <stdio.h>
34 #include <string.h>
35 #include <rx/xdr.h>
36 #include <rx/rx.h>
37 #include <lock.h>
38 #ifdef AFS_NT40_ENV
39 #include <winsock2.h>
40 #else
41 #include <netdb.h>
42 #include <netinet/in.h>
43 #endif
44 #include <afs/rxgen_consts.h>
45 #include "ubik.h"
46 #endif /* defined(UKERNEL) */
47
48
49 afs_int32 ubik_CallIter();
50 short ubik_initializationState; /* initial state is zero */
51
52
53 /*
54  * parse list for clients
55  */
56 int
57 ubik_ParseClientList(int argc, char **argv, afs_int32 * aothers)
58 {
59     register afs_int32 i;
60     register char *tp;
61     register struct hostent *th;
62     afs_int32 temp, counter;
63     int inServer;
64
65     inServer = 0;               /* haven't seen -servers yet */
66     counter = 0;
67     for (i = 1; i < argc; i++) {
68         /* look for -servers argument */
69         tp = argv[i];
70
71         if (inServer) {
72             if (*tp == '-')
73                 break;          /* done */
74             /* otherwise this is a new host name */
75             LOCK_GLOBAL_MUTEX th = gethostbyname(tp);
76             if (!th) {
77                 UNLOCK_GLOBAL_MUTEX return UBADHOST;
78             }
79             memmove((void *)&temp, (const void *)th->h_addr,
80                     sizeof(afs_int32));
81             UNLOCK_GLOBAL_MUTEX if (counter++ >= MAXSERVERS)
82                   return UNHOSTS;
83             *aothers++ = temp;
84         } else {
85             /* haven't seen a -server yet */
86             if (!strcmp(tp, "-servers")) {
87                 inServer = 1;
88             }
89         }
90     }
91     if (!inServer) {
92         /* never saw a -server */
93         return UNOENT;
94     }
95     if (counter < MAXSERVERS)
96         *aothers++ = 0;         /* null terminate if room */
97     return 0;
98 }
99
100 #ifdef AFS_PTHREAD_ENV
101 #include <pthread.h>
102 #include <assert.h>
103
104 static pthread_once_t random_once = PTHREAD_ONCE_INIT;
105 static int called_afs_random_once;
106 static pthread_key_t random_number_key;
107
108 static void
109 afs_random_once(void)
110 {
111     assert(pthread_key_create(&random_number_key, NULL) == 0);
112     called_afs_random_once = 1;
113 }
114
115 #endif
116 /* 
117  * Random number generator and constants from KnuthV2 2d ed, p170
118  *
119  * Rules:
120  * X = (aX + c) % m
121  * m is a power of two 
122  * a % 8 is 5
123  * a is 0.73m  should be 0.01m .. 0.99m
124  * c is more or less immaterial.  1 or a is suggested.
125  *
126  * NB:  LOW ORDER BITS are not very random.  To get small random numbers,
127  *      treat result as <1, with implied binary point, and multiply by 
128  *      desired modulus.
129  * NB:  Has to be unsigned, since shifts on signed quantities may preserve
130  *      the sign bit.
131  * 
132  * In this case, m == 2^32, the mod operation is implicit. a == pi, which
133  * is used because it has some interesting characteristics (lacks any
134  * interesting bit-patterns).   
135  * 
136  */
137
138 /* 
139  * use time and pid to try to get some initial randomness.
140  */
141 #if !defined(UKERNEL)
142 #define ranstage(x)     (x)= (afs_uint32) (3141592621U*((afs_uint32)x)+1)
143
144 unsigned int
145 afs_random(void)
146 {
147 #ifdef AFS_PTHREAD_ENV
148     afs_uint32 state;
149
150     (called_afs_random_once || pthread_once(&random_once, afs_random_once));
151     state = (afs_uint32) pthread_getspecific(random_number_key);
152 #else
153     static afs_uint32 state = 0;
154 #endif
155
156     if (!state) {
157         int i;
158         state = time(0) + getpid();
159         for (i = 0; i < 15; i++) {
160             ranstage(state);
161         }
162     }
163
164     ranstage(state);
165 #ifdef AFS_PTHREAD_ENV
166     pthread_setspecific(random_number_key, (const void *)state);
167 #endif
168     return (state);
169
170 }
171
172 /*
173  * returns int 0..14 using the high bits of a pseudo-random number instead of
174  * the low bits, as the low bits are "less random" than the high ones...
175  * slight roundoff error exists, an excercise for the reader.
176  * need to multiply by something with lots of ones in it, so multiply by 
177  * 8 or 16 is right out.
178  */
179
180 static unsigned int
181 afs_randomMod15(void)
182 {
183     afs_uint32 temp;
184
185     temp = afs_random() >> 4;
186     temp = (temp * 15) >> 28;
187
188     return temp;
189 }
190 #endif /* !defined(UKERNEL) */
191
192 #ifdef abs
193 #undef abs
194 #endif /* abs */
195 #define abs(a) ((a) < 0 ? -1*(a) : (a))
196 int
197 ubik_ClientInit(register struct rx_connection **serverconns,
198                 struct ubik_client **aclient)
199 {
200     int i, j;
201     int count;
202     int offset;
203     register struct ubik_client *tc;
204
205     initialize_U_error_table();
206
207     if (*aclient) {             /* the application is doing a re-initialization */
208         LOCK_UBIK_CLIENT((*aclient))
209             /* this is an important defensive check */
210             if (!((*aclient)->initializationState)) {
211             UNLOCK_UBIK_CLIENT((*aclient))
212                 return UREINITIALIZE;
213         }
214
215         /* release all existing connections */
216         for (tc = *aclient, i = 0; i < MAXSERVERS; i++) {
217             struct rx_connection *rxConn = ubik_GetRPCConn(tc, i);
218             if (rxConn == 0)
219                 break;
220 #ifdef AFS_PTHREAD_ENV
221             rx_ReleaseCachedConnection(rxConn);
222 #else
223             rx_DestroyConnection(rxConn);
224 #endif
225         }
226         UNLOCK_UBIK_CLIENT((*aclient))
227 #ifdef AFS_PTHREAD_ENV
228             if (pthread_mutex_destroy(&((*aclient)->cm)))
229             return UMUTEXDESTROY;
230 #endif
231     } else {
232         tc = (struct ubik_client *)malloc(sizeof(struct ubik_client));
233     }
234     if (tc == NULL)
235         return UNOMEM;
236     memset((void *)tc, 0, sizeof(*tc));
237 #ifdef AFS_PTHREAD_ENV
238     if (pthread_mutex_init(&(tc->cm), (const pthread_mutexattr_t *)0)) {
239         return UMUTEXINIT;
240     }
241 #endif
242     tc->initializationState = ++ubik_initializationState;
243
244     /* first count the # of server conns so we can randomize properly */
245     count = 0;
246     for (i = 0; i < MAXSERVERS; i++) {
247         if (serverconns[i] == (struct rx_connection *)0)
248             break;
249         count++;
250     }
251
252     /* here count is the # of servers we're actually passed in.  Compute
253      * offset, a number between 0..count-1, where we'll start copying from the
254      * client-provided array. */
255     for (i = 0; i < count; i++) {
256         offset = afs_randomMod15() % count;
257         for (j = abs(offset); j < 2 * count; j++) {
258             if (!tc->conns[abs(j % count)]) {
259                 tc->conns[abs(j % count)] = serverconns[i];
260                 break;
261             }
262         }
263     }
264
265     *aclient = tc;
266     return 0;
267 }
268
269 /* 
270  * ubik_ClientDestroy - destroys a ubik connection.  It calls rx to destroy the
271  * component rx connections, then frees the ubik connection structure.
272  */
273
274 afs_int32
275 ubik_ClientDestroy(struct ubik_client * aclient)
276 {
277     register int c;
278
279     if (aclient == 0)
280         return 0;
281     LOCK_UBIK_CLIENT(aclient);
282     for (c = 0; c < MAXSERVERS; c++) {
283         struct rx_connection *rxConn = ubik_GetRPCConn(aclient, c);
284         if (rxConn == 0)
285             break;
286 #ifdef AFS_PTHREAD_ENV
287         rx_ReleaseCachedConnection(rxConn);
288 #else
289         rx_DestroyConnection(rxConn);
290 #endif
291     }
292     aclient->initializationState = 0;   /* client in not initialized */
293     UNLOCK_UBIK_CLIENT(aclient);
294 #ifdef AFS_PTHREAD_ENV
295     pthread_mutex_destroy(&(aclient->cm));      /* ignore failure */
296 #endif
297     free(aclient);
298     return 0;
299 }
300
301 /*
302  * RefreshConn -- So that intermittent failures that cause connections to die
303  *     don't kill whole ubik connection, refresh them when the connection is in
304  *     error.
305  */
306
307 static struct rx_connection *
308 RefreshConn(struct rx_connection *tc)
309 {
310     afs_uint32 host;
311     u_short port;
312     u_short service;
313     struct rx_securityClass *sc;
314     int si;
315     struct rx_connection *newTc;
316
317     host = rx_HostOf(rx_PeerOf(tc));
318     port = rx_PortOf(rx_PeerOf(tc));
319     service = rx_ServiceIdOf(tc);
320     sc = rx_SecurityObjectOf(tc);
321     si = rx_SecurityClassOf(tc);
322
323     /*
324      * destroy old one after creating new one so that refCount on security
325      * object cannot reach zero.
326      */
327     newTc = rx_NewConnection(host, port, service, sc, si);
328     rx_DestroyConnection(tc);
329     return newTc;
330 }
331
332 #ifdef AFS_PTHREAD_ENV
333
334 pthread_once_t ubik_client_once = PTHREAD_ONCE_INIT;
335 pthread_mutex_t ubik_client_mutex;
336 #define LOCK_UCLNT_CACHE \
337     assert(pthread_once(&ubik_client_once, ubik_client_init_mutex) == 0 && \
338            pthread_mutex_lock(&ubik_client_mutex)==0);
339 #define UNLOCK_UCLNT_CACHE assert(pthread_mutex_unlock(&ubik_client_mutex)==0);
340
341 void
342 ubik_client_init_mutex()
343 {
344     assert(pthread_mutex_init(&ubik_client_mutex, NULL) == 0);
345 }
346
347 #else
348
349 #define LOCK_UCLNT_CACHE
350 #define UNLOCK_UCLNT_CACHE
351
352 #endif
353
354 #define SYNCCOUNT 10
355 static int *calls_needsync[SYNCCOUNT];  /* proc calls that need the sync site */
356 static int synccount = 0;
357
358 /*
359  * call this instead of stub and we'll guarantee to find a host that's up.
360  * in the future, we should also put in a protocol to find the sync site
361  */
362 afs_int32
363 ubik_Call(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
364           p11, p12, p13, p14, p15, p16)
365      int (*aproc) ();
366      register struct ubik_client *aclient;
367      afs_int32 aflags;
368      long p1;
369      long p2;
370      long p3;
371      long p4;
372      long p5;
373      long p6;
374      long p7;
375      long p8;
376      long p9;
377      long p10;
378      long p11;
379      long p12;
380      long p13;
381      long p14;
382      long p15;
383      long p16;
384 {
385     afs_int32 rcode, code, newHost, thisHost, i, count;
386     int chaseCount, pass, needsync, inlist, j;
387     struct rx_connection *tc;
388     struct rx_peer *rxp;
389     short origLevel;
390
391     if (!aclient)
392         return UNOENT;
393     LOCK_UBIK_CLIENT(aclient);
394
395   restart:
396     origLevel = aclient->initializationState;
397     rcode = UNOSERVERS;
398     chaseCount = inlist = needsync = 0;
399
400     LOCK_UCLNT_CACHE for (j = 0; ((j < SYNCCOUNT) && calls_needsync[j]); j++) {
401         if (calls_needsync[j] == (int *)aproc) {
402             inlist = needsync = 1;
403             break;
404         }
405     }
406     UNLOCK_UCLNT_CACHE
407         /* 
408          * First  pass, we try all servers that are up.
409          * Second pass, we try all servers.
410          */
411         for (pass = 0; pass < 2; pass++) {      /*p */
412         /* For each entry in our servers list */
413         for (count = 0;; count++) {     /*s */
414
415             if (needsync) {
416                 /* Need a sync site. Lets try to quickly find it */
417                 if (aclient->syncSite) {
418                     newHost = aclient->syncSite;        /* already in network order */
419                     aclient->syncSite = 0;      /* Will reset if it works */
420                 } else if (aclient->conns[3]) {
421                     /* If there are fewer than four db servers in a cell,
422                      * there's no point in making the GetSyncSite call.
423                      * At best, it's a wash. At worst, it results in more
424                      * RPCs than you would otherwise make.
425                      */
426                     tc = aclient->conns[count];
427                     if (tc && rx_ConnError(tc)) {
428                         aclient->conns[count] = tc = RefreshConn(tc);
429                     }
430                     if (!tc)
431                         break;
432                     code = VOTE_GetSyncSite(tc, &newHost);
433                     if (aclient->initializationState != origLevel)
434                         goto restart;   /* somebody did a ubik_ClientInit */
435                     if (code)
436                         newHost = 0;
437                     newHost = htonl(newHost);   /* convert to network order */
438                 } else {
439                     newHost = 0;
440                 }
441                 if (newHost) {
442                     /* position count at the appropriate slot in the client
443                      * structure and retry. If we can't find in slot, we'll
444                      * just continue through the whole list 
445                      */
446                     for (i = 0; i < MAXSERVERS && aclient->conns[i]; i++) {
447                         rxp = rx_PeerOf(aclient->conns[i]);
448                         thisHost = rx_HostOf(rxp);
449                         if (!thisHost)
450                             break;
451                         if (thisHost == newHost) {
452                             if (chaseCount++ > 2)
453                                 break;  /* avoid loop asking */
454                             count = i;  /* this index is the sync site */
455                             break;
456                         }
457                     }
458                 }
459             }
460             /*needsync */
461             tc = aclient->conns[count];
462             if (tc && rx_ConnError(tc)) {
463                 aclient->conns[count] = tc = RefreshConn(tc);
464             }
465             if (!tc)
466                 break;
467
468             if ((pass == 0) && (aclient->states[count] & CFLastFailed)) {
469                 continue;       /* this guy's down */
470             }
471
472             rcode =
473                 (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11,
474                           p12, p13, p14, p15, p16);
475             if (aclient->initializationState != origLevel) {
476                 /* somebody did a ubik_ClientInit */
477                 if (rcode)
478                     goto restart;       /* call failed */
479                 else
480                     goto done;  /* call suceeded */
481             }
482             if (rcode < 0) {    /* network errors */
483                 aclient->states[count] |= CFLastFailed; /* Mark serer down */
484             } else if (rcode == UNOTSYNC) {
485                 needsync = 1;
486             } else if (rcode != UNOQUORUM) {
487                 /* either misc ubik code, or misc appl code, or success. */
488                 aclient->states[count] &= ~CFLastFailed;        /* mark server up */
489                 goto done;      /* all done */
490             }
491         }                       /*s */
492     }                           /*p */
493
494   done:
495     if (needsync) {
496         if (!inlist) {          /* Remember proc call that needs sync site */
497             LOCK_UCLNT_CACHE calls_needsync[synccount % SYNCCOUNT] =
498                 (int *)aproc;
499             synccount++;
500             UNLOCK_UCLNT_CACHE inlist = 1;
501         }
502         if (!rcode) {           /* Remember the sync site - cmd successful */
503             rxp = rx_PeerOf(aclient->conns[count]);
504             aclient->syncSite = rx_HostOf(rxp);
505         }
506     }
507     UNLOCK_UBIK_CLIENT(aclient);
508     return rcode;
509 }
510
511
512
513 /*
514  * call this after getting back a UNOTSYNC 
515  * note that getting a UNOTSYNC error code back does *not* guarantee
516  * that there is a sync site yet elected.  However, if there is a sync
517  * site out there somewhere, and you're trying an operation that
518  * requires a sync site, ubik will return UNOTSYNC, indicating the
519  * operation won't work until you find a sync site
520  */
521 static int
522 try_GetSyncSite(register struct ubik_client *aclient, afs_int32 apos)
523 {
524     struct rx_peer *rxp;
525     afs_int32 code;
526     int i;
527     afs_int32 thisHost, newHost;
528     struct rx_connection *tc;
529     short origLevel;
530
531     origLevel = aclient->initializationState;
532
533     /* get this conn */
534     tc = aclient->conns[apos];
535     if (tc && rx_ConnError(tc)) {
536         aclient->conns[apos] = (tc = RefreshConn(tc));
537     }
538     if (!tc) {
539         return -1;
540     }
541
542     /* now see if we can find the sync site host */
543     code = VOTE_GetSyncSite(tc, &newHost);
544     if (aclient->initializationState != origLevel) {
545         return -1;              /* somebody did a ubik_ClientInit */
546     }
547
548     if (!code && newHost) {
549         newHost = htonl(newHost);       /* convert back to network order */
550
551         /*
552          * position count at the appropriate slot in the client
553          * structure and retry. If we can't find in slot, we'll just
554          * continue through the whole list
555          */
556         for (i = 0; i < MAXSERVERS; i++) {
557             rxp = rx_PeerOf(aclient->conns[i]);
558             thisHost = rx_HostOf(rxp);
559             if (!thisHost) {
560                 return -1;
561             } else if (thisHost == newHost) {
562                 return i;       /* we were told to use this one */
563             }
564         }
565     }
566     return -1;
567 }
568
569 /* 
570  * Create an internal version of ubik_CallIter that takes an additional
571  * parameter - to indicate whether the ubik client handle has already
572  * been locked.
573  */
574
575 #define NEED_LOCK 1
576 #define NO_LOCK 0
577
578 static afs_int32
579 CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7, p8, p9,
580          p10, p11, p12, p13, p14, p15, p16, needlock)
581      int (*aproc) ();
582      register struct ubik_client *aclient;
583      afs_int32 aflags;
584      int *apos;
585      long p1;
586      long p2;
587      long p3;
588      long p4;
589      long p5;
590      long p6;
591      long p7;
592      long p8;
593      long p9;
594      long p10;
595      long p11;
596      long p12;
597      long p13;
598      long p14;
599      long p15;
600      long p16;
601      int needlock;
602 {
603     register afs_int32 code;
604     struct rx_connection *tc;
605     short origLevel;
606
607     if (needlock) {
608         LOCK_UBIK_CLIENT(aclient)
609     }
610     origLevel = aclient->initializationState;
611
612     code = UNOSERVERS;
613
614     while (*apos < MAXSERVERS) {
615         /* tc is the next conn to try */
616         tc = aclient->conns[*apos];
617         if (!tc) {
618             if (needlock) {
619                 UNLOCK_UBIK_CLIENT(aclient)
620             }
621             return UNOSERVERS;
622         }
623
624         if (rx_ConnError(tc)) {
625             tc = RefreshConn(tc);
626             aclient->conns[*apos] = tc;
627         }
628
629         if ((aflags & UPUBIKONLY) && (aclient->states[*apos] & CFLastFailed)) {
630             (*apos)++;          /* try another one if this server is down */
631         } else {
632             break;              /* this is the desired path */
633         }
634     }
635     if (*apos >= MAXSERVERS) {
636         if (needlock) {
637             UNLOCK_UBIK_CLIENT(aclient)
638         }
639         return UNOSERVERS;
640     }
641
642     code =
643         (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
644                   p14, p15, p16);
645     if (aclient->initializationState != origLevel) {
646         if (needlock) {
647             UNLOCK_UBIK_CLIENT(aclient)
648         }
649         return code;            /* somebody did a ubik_ClientInit */
650     }
651
652     /* what should I do in case of UNOQUORUM ? */
653     if (code < 0) {
654         aclient->states[*apos] |= CFLastFailed; /* network errors */
655     } else {
656         /* either misc ubik code, or misc application code or success. */
657         aclient->states[*apos] &= ~CFLastFailed;        /* operation worked */
658     }
659
660     (*apos)++;
661     if (needlock) {
662         UNLOCK_UBIK_CLIENT(aclient)
663     }
664     return code;
665 }
666
667 /* 
668  * call this instead of stub and we'll guarantee to find a host that's up.
669  * in the future, we should also put in a protocol to find the sync site
670  */
671 afs_int32
672 ubik_Call_New(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
673               p11, p12, p13, p14, p15, p16)
674      int (*aproc) ();
675      register struct ubik_client *aclient;
676      afs_int32 aflags;
677      long p1;
678      long p2;
679      long p3;
680      long p4;
681      long p5;
682      long p6;
683      long p7;
684      long p8;
685      long p9;
686      long p10;
687      long p11;
688      long p12;
689      long p13;
690      long p14;
691      long p15;
692      long p16;
693 {
694     afs_int32 code, rcode;
695     afs_int32 count;
696     afs_int32 temp;
697     int pass;
698     int stepBack;
699     short origLevel;
700
701     LOCK_UBIK_CLIENT(aclient)
702   restart:
703     rcode = UNOSERVERS;
704     origLevel = aclient->initializationState;
705
706     /* Do two passes. First pass only checks servers known running */
707     for (aflags |= UPUBIKONLY, pass = 0; pass < 2;
708          pass++, aflags &= ~UPUBIKONLY) {
709         stepBack = 0;
710         count = 0;
711         while (1) {
712             code =
713                 CallIter(aproc, aclient, aflags, &count, p1, p2, p3, p4, p5,
714                          p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16,
715                          NO_LOCK);
716             if (code && (aclient->initializationState != origLevel)) {
717                 goto restart;
718             }
719             if (code == UNOSERVERS) {
720                 break;
721             }
722             rcode = code;       /* remember code from last good call */
723
724             if (code == UNOTSYNC) {     /* means this requires a sync site */
725                 if (aclient->conns[3]) {        /* don't bother unless 4 or more srv */
726                     temp = try_GetSyncSite(aclient, count);
727                     if (aclient->initializationState != origLevel) {
728                         goto restart;   /* somebody did a ubik_ClientInit */
729                     }
730                     if ((temp >= 0) && ((temp > count) || (stepBack++ <= 2))) {
731                         count = temp;   /* generally try to make progress */
732                     }
733                 }
734             } else if ((code >= 0) && (code != UNOQUORUM)) {
735                 UNLOCK_UBIK_CLIENT(aclient)
736                     return code;        /* success or global error condition */
737             }
738         }
739     }
740     UNLOCK_UBIK_CLIENT(aclient)
741         return rcode;
742 }
743
744 /* 
745  * This is part of an iterator.  It doesn't handle finding sync sites
746  */
747 afs_int32
748 ubik_CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7, p8,
749               p9, p10, p11, p12, p13, p14, p15, p16)
750      int (*aproc) ();
751      register struct ubik_client *aclient;
752      afs_int32 aflags;
753      int *apos;
754      long p1;
755      long p2;
756      long p3;
757      long p4;
758      long p5;
759      long p6;
760      long p7;
761      long p8;
762      long p9;
763      long p10;
764      long p11;
765      long p12;
766      long p13;
767      long p14;
768      long p15;
769      long p16;
770 {
771     return CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7,
772                     p8, p9, p10, p11, p12, p13, p14, p15, p16, NEED_LOCK);
773 }