ee9d4aaf7037b88aa8f51064adbdab8220f0b7e7
[openafs.git] / src / ubik / ubikclient.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #if defined(UKERNEL)
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #if defined(UKERNEL)
21 #include "afs/sysincludes.h"
22 #include "afsincludes.h"
23 #include "afs/stds.h"
24 #include "rx/xdr.h"
25 #include "rx/rx.h"
26 #include "afs/lock.h"
27 #include "afs/rxgen_consts.h"
28 #include "ubik.h"
29 #include "afs/pthread_glock.h"
30 #else /* defined(UKERNEL) */
31 #include <afs/stds.h>
32 #include <afs/pthread_glock.h>
33 #include <stdio.h>
34 #include <string.h>
35 #include <rx/xdr.h>
36 #include <rx/rx.h>
37 #include <lock.h>
38 #ifdef AFS_NT40_ENV
39 #include <winsock2.h>
40 #else
41 #include <netdb.h>
42 #include <netinet/in.h>
43 #endif
44 #include <afs/rxgen_consts.h>
45 #include "ubik.h"
46 #endif /* defined(UKERNEL) */
47
48
49 short ubik_initializationState; /*!< initial state is zero */
50
51
52 /*!
53  * \brief Parse list for clients.
54  */
55 int
56 ubik_ParseClientList(int argc, char **argv, afs_int32 * aothers)
57 {
58     register afs_int32 i;
59     register char *tp;
60     register struct hostent *th;
61     afs_int32 temp, counter;
62     int inServer;
63
64     inServer = 0;               /* haven't seen -servers yet */
65     counter = 0;
66     for (i = 1; i < argc; i++) {
67         /* look for -servers argument */
68         tp = argv[i];
69
70         if (inServer) {
71             if (*tp == '-')
72                 break;          /* done */
73             /* otherwise this is a new host name */
74             LOCK_GLOBAL_MUTEX;
75             th = gethostbyname(tp);
76             if (!th) {
77                 UNLOCK_GLOBAL_MUTEX;
78                 return UBADHOST;
79             }
80             memmove((void *)&temp, (const void *)th->h_addr,
81                     sizeof(afs_int32));
82             UNLOCK_GLOBAL_MUTEX;
83             if (counter++ >= MAXSERVERS)
84                 return UNHOSTS;
85             *aothers++ = temp;
86         } else {
87             /* haven't seen a -server yet */
88             if (!strcmp(tp, "-servers")) {
89                 inServer = 1;
90             }
91         }
92     }
93     if (!inServer) {
94         /* never saw a -server */
95         return UNOENT;
96     }
97     if (counter < MAXSERVERS)
98         *aothers++ = 0;         /* null terminate if room */
99     return 0;
100 }
101
102 #ifdef AFS_PTHREAD_ENV
103 #include <pthread.h>
104 #include <assert.h>
105
106 static pthread_once_t random_once = PTHREAD_ONCE_INIT;
107 static int called_afs_random_once;
108 static pthread_key_t random_number_key;
109
110 static void
111 afs_random_once(void)
112 {
113     assert(pthread_key_create(&random_number_key, NULL) == 0);
114     called_afs_random_once = 1;
115 }
116
117 #endif
118
119 #if !defined(UKERNEL)
120 /*! 
121  * \brief use time and pid to try to get some initial randomness.
122  */
123 #define ranstage(x)     (x)= (afs_uint32) (3141592621U*((afs_uint32)x)+1)
124
125 /*! 
126  * \brief Random number generator and constants from KnuthV2 2d ed, p170
127  *
128  * Rules: \n
129  * X = (aX + c) % m \n
130  * m is a power of two \n
131  * a % 8 is 5 \n
132  * a is 0.73m  should be 0.01m .. 0.99m \n
133  * c is more or less immaterial.  1 or a is suggested. \n
134  *
135  * NB:  LOW ORDER BITS are not very random.  To get small random numbers,
136  *      treat result as <1, with implied binary point, and multiply by 
137  *      desired modulus.
138  *
139  * NB:  Has to be unsigned, since shifts on signed quantities may preserve
140  *      the sign bit.
141  * 
142  * In this case, m == 2^32, the mod operation is implicit. a == pi, which
143  * is used because it has some interesting characteristics (lacks any
144  * interesting bit-patterns).   
145  */
146 unsigned int
147 afs_random(void)
148 {
149 #ifdef AFS_PTHREAD_ENV
150     afs_uint32 state;
151
152     (called_afs_random_once || pthread_once(&random_once, afs_random_once));
153     state = (afs_uint32) pthread_getspecific(random_number_key);
154 #else
155     static afs_uint32 state = 0;
156 #endif
157
158     if (!state) {
159         int i;
160         state = time(0) + getpid();
161         for (i = 0; i < 15; i++) {
162             ranstage(state);
163         }
164     }
165
166     ranstage(state);
167 #ifdef AFS_PTHREAD_ENV
168     pthread_setspecific(random_number_key, (const void *)state);
169 #endif
170     return (state);
171
172 }
173
174 /*!
175  * \brief Returns int 0..14 using the high bits of a pseudo-random number instead of
176  * the low bits, as the low bits are "less random" than the high ones...
177  * 
178  * \todo Slight roundoff error exists, an excercise for the reader.
179  *
180  * Need to multiply by something with lots of ones in it, so multiply by 
181  * 8 or 16 is right out.
182  */
183 static unsigned int
184 afs_randomMod15(void)
185 {
186     afs_uint32 temp;
187
188     temp = afs_random() >> 4;
189     temp = (temp * 15) >> 28;
190
191     return temp;
192 }
193 #endif /* !defined(UKERNEL) */
194
195 #ifdef abs
196 #undef abs
197 #endif /* abs */
198 #define abs(a) ((a) < 0 ? -1*(a) : (a))
199 int
200 ubik_ClientInit(register struct rx_connection **serverconns,
201                 struct ubik_client **aclient)
202 {
203     int i, j;
204     int count;
205     int offset;
206     register struct ubik_client *tc;
207
208     initialize_U_error_table();
209
210     if (*aclient) {             /* the application is doing a re-initialization */
211         LOCK_UBIK_CLIENT((*aclient));
212         /* this is an important defensive check */
213         if (!((*aclient)->initializationState)) {
214             UNLOCK_UBIK_CLIENT((*aclient));
215             return UREINITIALIZE;
216         }
217
218         /* release all existing connections */
219         for (tc = *aclient, i = 0; i < MAXSERVERS; i++) {
220             struct rx_connection *rxConn = ubik_GetRPCConn(tc, i);
221             if (rxConn == 0)
222                 break;
223 #ifdef AFS_PTHREAD_ENV
224             rx_ReleaseCachedConnection(rxConn);
225 #else
226             rx_DestroyConnection(rxConn);
227 #endif
228         }
229         UNLOCK_UBIK_CLIENT((*aclient));
230 #ifdef AFS_PTHREAD_ENV
231         if (pthread_mutex_destroy(&((*aclient)->cm)))
232             return UMUTEXDESTROY;
233 #endif
234     } else {
235         tc = (struct ubik_client *)malloc(sizeof(struct ubik_client));
236     }
237     if (tc == NULL)
238         return UNOMEM;
239     memset((void *)tc, 0, sizeof(*tc));
240 #ifdef AFS_PTHREAD_ENV
241     if (pthread_mutex_init(&(tc->cm), (const pthread_mutexattr_t *)0)) {
242         return UMUTEXINIT;
243     }
244 #endif
245     tc->initializationState = ++ubik_initializationState;
246
247     /* first count the # of server conns so we can randomize properly */
248     count = 0;
249     for (i = 0; i < MAXSERVERS; i++) {
250         if (serverconns[i] == (struct rx_connection *)0)
251             break;
252         count++;
253     }
254
255     /* here count is the # of servers we're actually passed in.  Compute
256      * offset, a number between 0..count-1, where we'll start copying from the
257      * client-provided array. */
258     for (i = 0; i < count; i++) {
259         offset = afs_randomMod15() % count;
260         for (j = abs(offset); j < 2 * count; j++) {
261             if (!tc->conns[abs(j % count)]) {
262                 tc->conns[abs(j % count)] = serverconns[i];
263                 break;
264             }
265         }
266     }
267
268     *aclient = tc;
269     return 0;
270 }
271
272 /*! 
273  * \brief Destroy an ubik connection.
274  *
275  * It calls rx to destroy the component rx connections, then frees the ubik
276  * connection structure.
277  */
278 afs_int32
279 ubik_ClientDestroy(struct ubik_client * aclient)
280 {
281     register int c;
282
283     if (aclient == 0)
284         return 0;
285     LOCK_UBIK_CLIENT(aclient);
286     for (c = 0; c < MAXSERVERS; c++) {
287         struct rx_connection *rxConn = ubik_GetRPCConn(aclient, c);
288         if (rxConn == 0)
289             break;
290 #ifdef AFS_PTHREAD_ENV
291         rx_ReleaseCachedConnection(rxConn);
292 #else
293         rx_DestroyConnection(rxConn);
294 #endif
295     }
296     aclient->initializationState = 0;   /* client in not initialized */
297     UNLOCK_UBIK_CLIENT(aclient);
298 #ifdef AFS_PTHREAD_ENV
299     pthread_mutex_destroy(&(aclient->cm));      /* ignore failure */
300 #endif
301     free(aclient);
302     return 0;
303 }
304
305 /*!
306  * \brief So that intermittent failures that cause connections to die
307  *     don't kill whole ubik connection, refresh them when the connection is in
308  *     error.
309  */
310 struct rx_connection *
311 ubik_RefreshConn(struct rx_connection *tc)
312 {
313     afs_uint32 host;
314     u_short port;
315     u_short service;
316     struct rx_securityClass *sc;
317     int si;
318     struct rx_connection *newTc;
319
320     host = rx_HostOf(rx_PeerOf(tc));
321     port = rx_PortOf(rx_PeerOf(tc));
322     service = rx_ServiceIdOf(tc);
323     sc = rx_SecurityObjectOf(tc);
324     si = rx_SecurityClassOf(tc);
325
326     /*
327      * destroy old one after creating new one so that refCount on security
328      * object cannot reach zero.
329      */
330     newTc = rx_NewConnection(host, port, service, sc, si);
331     rx_DestroyConnection(tc);
332     return newTc;
333 }
334
335 #ifdef AFS_PTHREAD_ENV
336
337 pthread_once_t ubik_client_once = PTHREAD_ONCE_INIT;
338 pthread_mutex_t ubik_client_mutex;
339 #define LOCK_UCLNT_CACHE \
340     assert(pthread_once(&ubik_client_once, ubik_client_init_mutex) == 0 && \
341            pthread_mutex_lock(&ubik_client_mutex)==0)
342 #define UNLOCK_UCLNT_CACHE assert(pthread_mutex_unlock(&ubik_client_mutex)==0)
343
344 void
345 ubik_client_init_mutex()
346 {
347     assert(pthread_mutex_init(&ubik_client_mutex, NULL) == 0);
348 }
349
350 #else
351
352 #define LOCK_UCLNT_CACHE
353 #define UNLOCK_UCLNT_CACHE
354
355 #endif
356
357 #define SYNCCOUNT 10
358 static int *calls_needsync[SYNCCOUNT];  /* proc calls that need the sync site */
359 static int synccount = 0;
360
361 /*!
362  * call this instead of stub and we'll guarantee to find a host that's up.
363  * 
364  * \todo In the future, we should also put in a protocol to find the sync site.
365  */
366 afs_int32
367 ubik_Call(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
368           p11, p12, p13, p14, p15, p16)
369      int (*aproc) ();
370      register struct ubik_client *aclient;
371      afs_int32 aflags;
372      long p1;
373      long p2;
374      long p3;
375      long p4;
376      long p5;
377      long p6;
378      long p7;
379      long p8;
380      long p9;
381      long p10;
382      long p11;
383      long p12;
384      long p13;
385      long p14;
386      long p15;
387      long p16;
388 {
389     afs_int32 rcode, code, newHost, thisHost, i, count;
390     int chaseCount, pass, needsync, inlist, j;
391     struct rx_connection *tc;
392     struct rx_peer *rxp;
393     short origLevel;
394
395     if (!aclient)
396         return UNOENT;
397     LOCK_UBIK_CLIENT(aclient);
398
399   restart:
400     origLevel = aclient->initializationState;
401     rcode = UNOSERVERS;
402     chaseCount = inlist = needsync = 0;
403
404     LOCK_UCLNT_CACHE;
405     for (j = 0; ((j < SYNCCOUNT) && calls_needsync[j]); j++) {
406         if (calls_needsync[j] == (int *)aproc) {
407             inlist = needsync = 1;
408             break;
409         }
410     }
411     UNLOCK_UCLNT_CACHE;
412     /* 
413      * First  pass, we try all servers that are up.
414      * Second pass, we try all servers.
415      */
416     for (pass = 0; pass < 2; pass++) {  /*p */
417         /* For each entry in our servers list */
418         for (count = 0;; count++) {     /*s */
419
420             if (needsync) {
421                 /* Need a sync site. Lets try to quickly find it */
422                 if (aclient->syncSite) {
423                     newHost = aclient->syncSite;        /* already in network order */
424                     aclient->syncSite = 0;      /* Will reset if it works */
425                 } else if (aclient->conns[3]) {
426                     /* If there are fewer than four db servers in a cell,
427                      * there's no point in making the GetSyncSite call.
428                      * At best, it's a wash. At worst, it results in more
429                      * RPCs than you would otherwise make.
430                      */
431                     tc = aclient->conns[count];
432                     if (tc && rx_ConnError(tc)) {
433                         aclient->conns[count] = tc = ubik_RefreshConn(tc);
434                     }
435                     if (!tc)
436                         break;
437                     code = VOTE_GetSyncSite(tc, &newHost);
438                     if (aclient->initializationState != origLevel)
439                         goto restart;   /* somebody did a ubik_ClientInit */
440                     if (code)
441                         newHost = 0;
442                     newHost = htonl(newHost);   /* convert to network order */
443                 } else {
444                     newHost = 0;
445                 }
446                 if (newHost) {
447                     /* position count at the appropriate slot in the client
448                      * structure and retry. If we can't find in slot, we'll
449                      * just continue through the whole list 
450                      */
451                     for (i = 0; i < MAXSERVERS && aclient->conns[i]; i++) {
452                         rxp = rx_PeerOf(aclient->conns[i]);
453                         thisHost = rx_HostOf(rxp);
454                         if (!thisHost)
455                             break;
456                         if (thisHost == newHost) {
457                             if (chaseCount++ > 2)
458                                 break;  /* avoid loop asking */
459                             count = i;  /* this index is the sync site */
460                             break;
461                         }
462                     }
463                 }
464             }
465             /*needsync */
466             tc = aclient->conns[count];
467             if (tc && rx_ConnError(tc)) {
468                 aclient->conns[count] = tc = ubik_RefreshConn(tc);
469             }
470             if (!tc)
471                 break;
472
473             if ((pass == 0) && (aclient->states[count] & CFLastFailed)) {
474                 continue;       /* this guy's down */
475             }
476
477             rcode =
478                 (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11,
479                           p12, p13, p14, p15, p16);
480             if (aclient->initializationState != origLevel) {
481                 /* somebody did a ubik_ClientInit */
482                 if (rcode)
483                     goto restart;       /* call failed */
484                 else
485                     goto done;  /* call suceeded */
486             }
487             if (rcode < 0) {    /* network errors */
488                 aclient->states[count] |= CFLastFailed; /* Mark serer down */
489             } else if (rcode == UNOTSYNC) {
490                 needsync = 1;
491             } else if (rcode != UNOQUORUM) {
492                 /* either misc ubik code, or misc appl code, or success. */
493                 aclient->states[count] &= ~CFLastFailed;        /* mark server up */
494                 goto done;      /* all done */
495             }
496         }                       /*s */
497     }                           /*p */
498
499   done:
500     if (needsync) {
501         if (!inlist) {          /* Remember proc call that needs sync site */
502             LOCK_UCLNT_CACHE;
503             calls_needsync[synccount % SYNCCOUNT] = (int *)aproc;
504             synccount++;
505             UNLOCK_UCLNT_CACHE;
506             inlist = 1;
507         }
508         if (!rcode) {           /* Remember the sync site - cmd successful */
509             rxp = rx_PeerOf(aclient->conns[count]);
510             aclient->syncSite = rx_HostOf(rxp);
511         }
512     }
513     UNLOCK_UBIK_CLIENT(aclient);
514     return rcode;
515 }
516
517
518
519 /*!
520  * \brief Call this after getting back a #UNOTSYNC.
521  *
522  * \note Getting a #UNOTSYNC error code back does \b not guarantee
523  * that there is a sync site yet elected.  However, if there is a sync
524  * site out there somewhere, and you're trying an operation that
525  * requires a sync site, ubik will return #UNOTSYNC, indicating the
526  * operation won't work until you find a sync site
527  */
528 static int
529 try_GetSyncSite(register struct ubik_client *aclient, afs_int32 apos)
530 {
531     struct rx_peer *rxp;
532     afs_int32 code;
533     int i;
534     afs_int32 thisHost, newHost;
535     struct rx_connection *tc;
536     short origLevel;
537
538     origLevel = aclient->initializationState;
539
540     /* get this conn */
541     tc = aclient->conns[apos];
542     if (tc && rx_ConnError(tc)) {
543         aclient->conns[apos] = (tc = ubik_RefreshConn(tc));
544     }
545     if (!tc) {
546         return -1;
547     }
548
549     /* now see if we can find the sync site host */
550     code = VOTE_GetSyncSite(tc, &newHost);
551     if (aclient->initializationState != origLevel) {
552         return -1;              /* somebody did a ubik_ClientInit */
553     }
554
555     if (!code && newHost) {
556         newHost = htonl(newHost);       /* convert back to network order */
557
558         /*
559          * position count at the appropriate slot in the client
560          * structure and retry. If we can't find in slot, we'll just
561          * continue through the whole list
562          */
563         for (i = 0; i < MAXSERVERS; i++) {
564             rxp = rx_PeerOf(aclient->conns[i]);
565             thisHost = rx_HostOf(rxp);
566             if (!thisHost) {
567                 return -1;
568             } else if (thisHost == newHost) {
569                 return i;       /* we were told to use this one */
570             }
571         }
572     }
573     return -1;
574 }
575
576 #define NEED_LOCK 1
577 #define NO_LOCK 0
578
579 /*! 
580  * \brief Create an internal version of ubik_CallIter that takes an additional
581  * parameter - to indicate whether the ubik client handle has already
582  * been locked.
583  */
584 static afs_int32
585 CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7, p8, p9,
586          p10, p11, p12, p13, p14, p15, p16, needlock)
587      int (*aproc) ();
588      register struct ubik_client *aclient;
589      afs_int32 aflags;
590      int *apos;
591      long p1;
592      long p2;
593      long p3;
594      long p4;
595      long p5;
596      long p6;
597      long p7;
598      long p8;
599      long p9;
600      long p10;
601      long p11;
602      long p12;
603      long p13;
604      long p14;
605      long p15;
606      long p16;
607      int needlock;
608 {
609     register afs_int32 code;
610     struct rx_connection *tc;
611     short origLevel;
612
613     if (needlock) {
614         LOCK_UBIK_CLIENT(aclient);
615     }
616     origLevel = aclient->initializationState;
617
618     code = UNOSERVERS;
619
620     while (*apos < MAXSERVERS) {
621         /* tc is the next conn to try */
622         tc = aclient->conns[*apos];
623         if (!tc) {
624             if (needlock) {
625                 UNLOCK_UBIK_CLIENT(aclient);
626             }
627             return UNOSERVERS;
628         }
629
630         if (rx_ConnError(tc)) {
631             tc = ubik_RefreshConn(tc);
632             aclient->conns[*apos] = tc;
633         }
634
635         if ((aflags & UPUBIKONLY) && (aclient->states[*apos] & CFLastFailed)) {
636             (*apos)++;          /* try another one if this server is down */
637         } else {
638             break;              /* this is the desired path */
639         }
640     }
641     if (*apos >= MAXSERVERS) {
642         if (needlock) {
643             UNLOCK_UBIK_CLIENT(aclient);
644         }
645         return UNOSERVERS;
646     }
647
648     code =
649         (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
650                   p14, p15, p16);
651     if (aclient->initializationState != origLevel) {
652         if (needlock) {
653             UNLOCK_UBIK_CLIENT(aclient);
654         }
655         return code;            /* somebody did a ubik_ClientInit */
656     }
657
658     /* what should I do in case of UNOQUORUM ? */
659     if (code < 0) {
660         aclient->states[*apos] |= CFLastFailed; /* network errors */
661     } else {
662         /* either misc ubik code, or misc application code or success. */
663         aclient->states[*apos] &= ~CFLastFailed;        /* operation worked */
664     }
665
666     (*apos)++;
667     if (needlock) {
668         UNLOCK_UBIK_CLIENT(aclient);
669     }
670     return code;
671 }
672
673 /*! 
674  * \brief Call this instead of stub and we'll guarantee to find a host that's up.
675  *
676  * \todo In the future, we should also put in a protocol to find the sync site.
677  */
678 afs_int32
679 ubik_Call_New(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
680               p11, p12, p13, p14, p15, p16)
681      int (*aproc) ();
682      register struct ubik_client *aclient;
683      afs_int32 aflags;
684      long p1;
685      long p2;
686      long p3;
687      long p4;
688      long p5;
689      long p6;
690      long p7;
691      long p8;
692      long p9;
693      long p10;
694      long p11;
695      long p12;
696      long p13;
697      long p14;
698      long p15;
699      long p16;
700 {
701     afs_int32 code, rcode;
702     afs_int32 count;
703     afs_int32 temp;
704     int pass;
705     int stepBack;
706     short origLevel;
707
708     LOCK_UBIK_CLIENT(aclient);
709   restart:
710     rcode = UNOSERVERS;
711     origLevel = aclient->initializationState;
712
713     /* Do two passes. First pass only checks servers known running */
714     for (aflags |= UPUBIKONLY, pass = 0; pass < 2;
715          pass++, aflags &= ~UPUBIKONLY) {
716         stepBack = 0;
717         count = 0;
718         while (1) {
719             code =
720                 CallIter(aproc, aclient, aflags, &count, p1, p2, p3, p4, p5,
721                          p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16,
722                          NO_LOCK);
723             if (code && (aclient->initializationState != origLevel)) {
724                 goto restart;
725             }
726             if (code == UNOSERVERS) {
727                 break;
728             }
729             rcode = code;       /* remember code from last good call */
730
731             if (code == UNOTSYNC) {     /* means this requires a sync site */
732                 if (aclient->conns[3]) {        /* don't bother unless 4 or more srv */
733                     temp = try_GetSyncSite(aclient, count);
734                     if (aclient->initializationState != origLevel) {
735                         goto restart;   /* somebody did a ubik_ClientInit */
736                     }
737                     if ((temp >= 0) && ((temp > count) || (stepBack++ <= 2))) {
738                         count = temp;   /* generally try to make progress */
739                     }
740                 }
741             } else if ((code >= 0) && (code != UNOQUORUM)) {
742                 UNLOCK_UBIK_CLIENT(aclient);
743                 return code;    /* success or global error condition */
744             }
745         }
746     }
747     UNLOCK_UBIK_CLIENT(aclient);
748     return rcode;
749 }
750
751 /*!
752  * \brief This is part of an iterator.  It doesn't handle finding sync sites.
753  */
754 afs_int32
755 ubik_CallIter(int (*aproc) (), struct ubik_client *aclient,
756                                afs_int32 aflags, int *apos, long p1, long p2,
757                                long p3, long p4, long p5, long p6, long p7,
758                                long p8, long p9, long p10, long p11, long p12,
759                                long p13, long p14, long p15, long p16)
760 {
761     return CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7,
762                     p8, p9, p10, p11, p12, p13, p14, p15, p16, NEED_LOCK);
763 }