fix-indent-bug-with-lock-macros-part-three-20040818
[openafs.git] / src / ubik / ubikclient.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #if defined(UKERNEL)
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #if defined(UKERNEL)
21 #include "afs/sysincludes.h"
22 #include "afsincludes.h"
23 #include "afs/stds.h"
24 #include "rx/xdr.h"
25 #include "rx/rx.h"
26 #include "afs/lock.h"
27 #include "afs/rxgen_consts.h"
28 #include "ubik.h"
29 #include "afs/pthread_glock.h"
30 #else /* defined(UKERNEL) */
31 #include <afs/stds.h>
32 #include <afs/pthread_glock.h>
33 #include <stdio.h>
34 #include <string.h>
35 #include <rx/xdr.h>
36 #include <rx/rx.h>
37 #include <lock.h>
38 #ifdef AFS_NT40_ENV
39 #include <winsock2.h>
40 #else
41 #include <netdb.h>
42 #include <netinet/in.h>
43 #endif
44 #include <afs/rxgen_consts.h>
45 #include "ubik.h"
46 #endif /* defined(UKERNEL) */
47
48
49 afs_int32 ubik_CallIter();
50 short ubik_initializationState; /* initial state is zero */
51
52
53 /*
54  * parse list for clients
55  */
56 int
57 ubik_ParseClientList(int argc, char **argv, afs_int32 * aothers)
58 {
59     register afs_int32 i;
60     register char *tp;
61     register struct hostent *th;
62     afs_int32 temp, counter;
63     int inServer;
64
65     inServer = 0;               /* haven't seen -servers yet */
66     counter = 0;
67     for (i = 1; i < argc; i++) {
68         /* look for -servers argument */
69         tp = argv[i];
70
71         if (inServer) {
72             if (*tp == '-')
73                 break;          /* done */
74             /* otherwise this is a new host name */
75             LOCK_GLOBAL_MUTEX;
76             th = gethostbyname(tp);
77             if (!th) {
78                 UNLOCK_GLOBAL_MUTEX;
79                 return UBADHOST;
80             }
81             memmove((void *)&temp, (const void *)th->h_addr,
82                     sizeof(afs_int32));
83             UNLOCK_GLOBAL_MUTEX;
84             if (counter++ >= MAXSERVERS)
85                 return UNHOSTS;
86             *aothers++ = temp;
87         } else {
88             /* haven't seen a -server yet */
89             if (!strcmp(tp, "-servers")) {
90                 inServer = 1;
91             }
92         }
93     }
94     if (!inServer) {
95         /* never saw a -server */
96         return UNOENT;
97     }
98     if (counter < MAXSERVERS)
99         *aothers++ = 0;         /* null terminate if room */
100     return 0;
101 }
102
103 #ifdef AFS_PTHREAD_ENV
104 #include <pthread.h>
105 #include <assert.h>
106
107 static pthread_once_t random_once = PTHREAD_ONCE_INIT;
108 static int called_afs_random_once;
109 static pthread_key_t random_number_key;
110
111 static void
112 afs_random_once(void)
113 {
114     assert(pthread_key_create(&random_number_key, NULL) == 0);
115     called_afs_random_once = 1;
116 }
117
118 #endif
119 /* 
120  * Random number generator and constants from KnuthV2 2d ed, p170
121  *
122  * Rules:
123  * X = (aX + c) % m
124  * m is a power of two 
125  * a % 8 is 5
126  * a is 0.73m  should be 0.01m .. 0.99m
127  * c is more or less immaterial.  1 or a is suggested.
128  *
129  * NB:  LOW ORDER BITS are not very random.  To get small random numbers,
130  *      treat result as <1, with implied binary point, and multiply by 
131  *      desired modulus.
132  * NB:  Has to be unsigned, since shifts on signed quantities may preserve
133  *      the sign bit.
134  * 
135  * In this case, m == 2^32, the mod operation is implicit. a == pi, which
136  * is used because it has some interesting characteristics (lacks any
137  * interesting bit-patterns).   
138  * 
139  */
140
141 /* 
142  * use time and pid to try to get some initial randomness.
143  */
144 #if !defined(UKERNEL)
145 #define ranstage(x)     (x)= (afs_uint32) (3141592621U*((afs_uint32)x)+1)
146
147 unsigned int
148 afs_random(void)
149 {
150 #ifdef AFS_PTHREAD_ENV
151     afs_uint32 state;
152
153     (called_afs_random_once || pthread_once(&random_once, afs_random_once));
154     state = (afs_uint32) pthread_getspecific(random_number_key);
155 #else
156     static afs_uint32 state = 0;
157 #endif
158
159     if (!state) {
160         int i;
161         state = time(0) + getpid();
162         for (i = 0; i < 15; i++) {
163             ranstage(state);
164         }
165     }
166
167     ranstage(state);
168 #ifdef AFS_PTHREAD_ENV
169     pthread_setspecific(random_number_key, (const void *)state);
170 #endif
171     return (state);
172
173 }
174
175 /*
176  * returns int 0..14 using the high bits of a pseudo-random number instead of
177  * the low bits, as the low bits are "less random" than the high ones...
178  * slight roundoff error exists, an excercise for the reader.
179  * need to multiply by something with lots of ones in it, so multiply by 
180  * 8 or 16 is right out.
181  */
182
183 static unsigned int
184 afs_randomMod15(void)
185 {
186     afs_uint32 temp;
187
188     temp = afs_random() >> 4;
189     temp = (temp * 15) >> 28;
190
191     return temp;
192 }
193 #endif /* !defined(UKERNEL) */
194
195 #ifdef abs
196 #undef abs
197 #endif /* abs */
198 #define abs(a) ((a) < 0 ? -1*(a) : (a))
199 int
200 ubik_ClientInit(register struct rx_connection **serverconns,
201                 struct ubik_client **aclient)
202 {
203     int i, j;
204     int count;
205     int offset;
206     register struct ubik_client *tc;
207
208     initialize_U_error_table();
209
210     if (*aclient) {             /* the application is doing a re-initialization */
211         LOCK_UBIK_CLIENT((*aclient));
212         /* this is an important defensive check */
213         if (!((*aclient)->initializationState)) {
214             UNLOCK_UBIK_CLIENT((*aclient));
215             return UREINITIALIZE;
216         }
217
218         /* release all existing connections */
219         for (tc = *aclient, i = 0; i < MAXSERVERS; i++) {
220             struct rx_connection *rxConn = ubik_GetRPCConn(tc, i);
221             if (rxConn == 0)
222                 break;
223 #ifdef AFS_PTHREAD_ENV
224             rx_ReleaseCachedConnection(rxConn);
225 #else
226             rx_DestroyConnection(rxConn);
227 #endif
228         }
229         UNLOCK_UBIK_CLIENT((*aclient));
230 #ifdef AFS_PTHREAD_ENV
231         if (pthread_mutex_destroy(&((*aclient)->cm)))
232             return UMUTEXDESTROY;
233 #endif
234     } else {
235         tc = (struct ubik_client *)malloc(sizeof(struct ubik_client));
236     }
237     if (tc == NULL)
238         return UNOMEM;
239     memset((void *)tc, 0, sizeof(*tc));
240 #ifdef AFS_PTHREAD_ENV
241     if (pthread_mutex_init(&(tc->cm), (const pthread_mutexattr_t *)0)) {
242         return UMUTEXINIT;
243     }
244 #endif
245     tc->initializationState = ++ubik_initializationState;
246
247     /* first count the # of server conns so we can randomize properly */
248     count = 0;
249     for (i = 0; i < MAXSERVERS; i++) {
250         if (serverconns[i] == (struct rx_connection *)0)
251             break;
252         count++;
253     }
254
255     /* here count is the # of servers we're actually passed in.  Compute
256      * offset, a number between 0..count-1, where we'll start copying from the
257      * client-provided array. */
258     for (i = 0; i < count; i++) {
259         offset = afs_randomMod15() % count;
260         for (j = abs(offset); j < 2 * count; j++) {
261             if (!tc->conns[abs(j % count)]) {
262                 tc->conns[abs(j % count)] = serverconns[i];
263                 break;
264             }
265         }
266     }
267
268     *aclient = tc;
269     return 0;
270 }
271
272 /* 
273  * ubik_ClientDestroy - destroys a ubik connection.  It calls rx to destroy the
274  * component rx connections, then frees the ubik connection structure.
275  */
276
277 afs_int32
278 ubik_ClientDestroy(struct ubik_client * aclient)
279 {
280     register int c;
281
282     if (aclient == 0)
283         return 0;
284     LOCK_UBIK_CLIENT(aclient);
285     for (c = 0; c < MAXSERVERS; c++) {
286         struct rx_connection *rxConn = ubik_GetRPCConn(aclient, c);
287         if (rxConn == 0)
288             break;
289 #ifdef AFS_PTHREAD_ENV
290         rx_ReleaseCachedConnection(rxConn);
291 #else
292         rx_DestroyConnection(rxConn);
293 #endif
294     }
295     aclient->initializationState = 0;   /* client in not initialized */
296     UNLOCK_UBIK_CLIENT(aclient);
297 #ifdef AFS_PTHREAD_ENV
298     pthread_mutex_destroy(&(aclient->cm));      /* ignore failure */
299 #endif
300     free(aclient);
301     return 0;
302 }
303
304 /*
305  * RefreshConn -- So that intermittent failures that cause connections to die
306  *     don't kill whole ubik connection, refresh them when the connection is in
307  *     error.
308  */
309
310 static struct rx_connection *
311 RefreshConn(struct rx_connection *tc)
312 {
313     afs_uint32 host;
314     u_short port;
315     u_short service;
316     struct rx_securityClass *sc;
317     int si;
318     struct rx_connection *newTc;
319
320     host = rx_HostOf(rx_PeerOf(tc));
321     port = rx_PortOf(rx_PeerOf(tc));
322     service = rx_ServiceIdOf(tc);
323     sc = rx_SecurityObjectOf(tc);
324     si = rx_SecurityClassOf(tc);
325
326     /*
327      * destroy old one after creating new one so that refCount on security
328      * object cannot reach zero.
329      */
330     newTc = rx_NewConnection(host, port, service, sc, si);
331     rx_DestroyConnection(tc);
332     return newTc;
333 }
334
335 #ifdef AFS_PTHREAD_ENV
336
337 pthread_once_t ubik_client_once = PTHREAD_ONCE_INIT;
338 pthread_mutex_t ubik_client_mutex;
339 #define LOCK_UCLNT_CACHE \
340     assert(pthread_once(&ubik_client_once, ubik_client_init_mutex) == 0 && \
341            pthread_mutex_lock(&ubik_client_mutex)==0)
342 #define UNLOCK_UCLNT_CACHE assert(pthread_mutex_unlock(&ubik_client_mutex)==0)
343
344 void
345 ubik_client_init_mutex()
346 {
347     assert(pthread_mutex_init(&ubik_client_mutex, NULL) == 0);
348 }
349
350 #else
351
352 #define LOCK_UCLNT_CACHE
353 #define UNLOCK_UCLNT_CACHE
354
355 #endif
356
357 #define SYNCCOUNT 10
358 static int *calls_needsync[SYNCCOUNT];  /* proc calls that need the sync site */
359 static int synccount = 0;
360
361 /*
362  * call this instead of stub and we'll guarantee to find a host that's up.
363  * in the future, we should also put in a protocol to find the sync site
364  */
365 afs_int32
366 ubik_Call(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
367           p11, p12, p13, p14, p15, p16)
368      int (*aproc) ();
369      register struct ubik_client *aclient;
370      afs_int32 aflags;
371      long p1;
372      long p2;
373      long p3;
374      long p4;
375      long p5;
376      long p6;
377      long p7;
378      long p8;
379      long p9;
380      long p10;
381      long p11;
382      long p12;
383      long p13;
384      long p14;
385      long p15;
386      long p16;
387 {
388     afs_int32 rcode, code, newHost, thisHost, i, count;
389     int chaseCount, pass, needsync, inlist, j;
390     struct rx_connection *tc;
391     struct rx_peer *rxp;
392     short origLevel;
393
394     if (!aclient)
395         return UNOENT;
396     LOCK_UBIK_CLIENT(aclient);
397
398   restart:
399     origLevel = aclient->initializationState;
400     rcode = UNOSERVERS;
401     chaseCount = inlist = needsync = 0;
402
403     LOCK_UCLNT_CACHE;
404     for (j = 0; ((j < SYNCCOUNT) && calls_needsync[j]); j++) {
405         if (calls_needsync[j] == (int *)aproc) {
406             inlist = needsync = 1;
407             break;
408         }
409     }
410     UNLOCK_UCLNT_CACHE;
411     /* 
412      * First  pass, we try all servers that are up.
413      * Second pass, we try all servers.
414      */
415     for (pass = 0; pass < 2; pass++) {  /*p */
416         /* For each entry in our servers list */
417         for (count = 0;; count++) {     /*s */
418
419             if (needsync) {
420                 /* Need a sync site. Lets try to quickly find it */
421                 if (aclient->syncSite) {
422                     newHost = aclient->syncSite;        /* already in network order */
423                     aclient->syncSite = 0;      /* Will reset if it works */
424                 } else if (aclient->conns[3]) {
425                     /* If there are fewer than four db servers in a cell,
426                      * there's no point in making the GetSyncSite call.
427                      * At best, it's a wash. At worst, it results in more
428                      * RPCs than you would otherwise make.
429                      */
430                     tc = aclient->conns[count];
431                     if (tc && rx_ConnError(tc)) {
432                         aclient->conns[count] = tc = RefreshConn(tc);
433                     }
434                     if (!tc)
435                         break;
436                     code = VOTE_GetSyncSite(tc, &newHost);
437                     if (aclient->initializationState != origLevel)
438                         goto restart;   /* somebody did a ubik_ClientInit */
439                     if (code)
440                         newHost = 0;
441                     newHost = htonl(newHost);   /* convert to network order */
442                 } else {
443                     newHost = 0;
444                 }
445                 if (newHost) {
446                     /* position count at the appropriate slot in the client
447                      * structure and retry. If we can't find in slot, we'll
448                      * just continue through the whole list 
449                      */
450                     for (i = 0; i < MAXSERVERS && aclient->conns[i]; i++) {
451                         rxp = rx_PeerOf(aclient->conns[i]);
452                         thisHost = rx_HostOf(rxp);
453                         if (!thisHost)
454                             break;
455                         if (thisHost == newHost) {
456                             if (chaseCount++ > 2)
457                                 break;  /* avoid loop asking */
458                             count = i;  /* this index is the sync site */
459                             break;
460                         }
461                     }
462                 }
463             }
464             /*needsync */
465             tc = aclient->conns[count];
466             if (tc && rx_ConnError(tc)) {
467                 aclient->conns[count] = tc = RefreshConn(tc);
468             }
469             if (!tc)
470                 break;
471
472             if ((pass == 0) && (aclient->states[count] & CFLastFailed)) {
473                 continue;       /* this guy's down */
474             }
475
476             rcode =
477                 (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11,
478                           p12, p13, p14, p15, p16);
479             if (aclient->initializationState != origLevel) {
480                 /* somebody did a ubik_ClientInit */
481                 if (rcode)
482                     goto restart;       /* call failed */
483                 else
484                     goto done;  /* call suceeded */
485             }
486             if (rcode < 0) {    /* network errors */
487                 aclient->states[count] |= CFLastFailed; /* Mark serer down */
488             } else if (rcode == UNOTSYNC) {
489                 needsync = 1;
490             } else if (rcode != UNOQUORUM) {
491                 /* either misc ubik code, or misc appl code, or success. */
492                 aclient->states[count] &= ~CFLastFailed;        /* mark server up */
493                 goto done;      /* all done */
494             }
495         }                       /*s */
496     }                           /*p */
497
498   done:
499     if (needsync) {
500         if (!inlist) {          /* Remember proc call that needs sync site */
501             LOCK_UCLNT_CACHE;
502             calls_needsync[synccount % SYNCCOUNT] = (int *)aproc;
503             synccount++;
504             UNLOCK_UCLNT_CACHE;
505             inlist = 1;
506         }
507         if (!rcode) {           /* Remember the sync site - cmd successful */
508             rxp = rx_PeerOf(aclient->conns[count]);
509             aclient->syncSite = rx_HostOf(rxp);
510         }
511     }
512     UNLOCK_UBIK_CLIENT(aclient);
513     return rcode;
514 }
515
516
517
518 /*
519  * call this after getting back a UNOTSYNC 
520  * note that getting a UNOTSYNC error code back does *not* guarantee
521  * that there is a sync site yet elected.  However, if there is a sync
522  * site out there somewhere, and you're trying an operation that
523  * requires a sync site, ubik will return UNOTSYNC, indicating the
524  * operation won't work until you find a sync site
525  */
526 static int
527 try_GetSyncSite(register struct ubik_client *aclient, afs_int32 apos)
528 {
529     struct rx_peer *rxp;
530     afs_int32 code;
531     int i;
532     afs_int32 thisHost, newHost;
533     struct rx_connection *tc;
534     short origLevel;
535
536     origLevel = aclient->initializationState;
537
538     /* get this conn */
539     tc = aclient->conns[apos];
540     if (tc && rx_ConnError(tc)) {
541         aclient->conns[apos] = (tc = RefreshConn(tc));
542     }
543     if (!tc) {
544         return -1;
545     }
546
547     /* now see if we can find the sync site host */
548     code = VOTE_GetSyncSite(tc, &newHost);
549     if (aclient->initializationState != origLevel) {
550         return -1;              /* somebody did a ubik_ClientInit */
551     }
552
553     if (!code && newHost) {
554         newHost = htonl(newHost);       /* convert back to network order */
555
556         /*
557          * position count at the appropriate slot in the client
558          * structure and retry. If we can't find in slot, we'll just
559          * continue through the whole list
560          */
561         for (i = 0; i < MAXSERVERS; i++) {
562             rxp = rx_PeerOf(aclient->conns[i]);
563             thisHost = rx_HostOf(rxp);
564             if (!thisHost) {
565                 return -1;
566             } else if (thisHost == newHost) {
567                 return i;       /* we were told to use this one */
568             }
569         }
570     }
571     return -1;
572 }
573
574 /* 
575  * Create an internal version of ubik_CallIter that takes an additional
576  * parameter - to indicate whether the ubik client handle has already
577  * been locked.
578  */
579
580 #define NEED_LOCK 1
581 #define NO_LOCK 0
582
583 static afs_int32
584 CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7, p8, p9,
585          p10, p11, p12, p13, p14, p15, p16, needlock)
586      int (*aproc) ();
587      register struct ubik_client *aclient;
588      afs_int32 aflags;
589      int *apos;
590      long p1;
591      long p2;
592      long p3;
593      long p4;
594      long p5;
595      long p6;
596      long p7;
597      long p8;
598      long p9;
599      long p10;
600      long p11;
601      long p12;
602      long p13;
603      long p14;
604      long p15;
605      long p16;
606      int needlock;
607 {
608     register afs_int32 code;
609     struct rx_connection *tc;
610     short origLevel;
611
612     if (needlock) {
613         LOCK_UBIK_CLIENT(aclient);
614     }
615     origLevel = aclient->initializationState;
616
617     code = UNOSERVERS;
618
619     while (*apos < MAXSERVERS) {
620         /* tc is the next conn to try */
621         tc = aclient->conns[*apos];
622         if (!tc) {
623             if (needlock) {
624                 UNLOCK_UBIK_CLIENT(aclient);
625             }
626             return UNOSERVERS;
627         }
628
629         if (rx_ConnError(tc)) {
630             tc = RefreshConn(tc);
631             aclient->conns[*apos] = tc;
632         }
633
634         if ((aflags & UPUBIKONLY) && (aclient->states[*apos] & CFLastFailed)) {
635             (*apos)++;          /* try another one if this server is down */
636         } else {
637             break;              /* this is the desired path */
638         }
639     }
640     if (*apos >= MAXSERVERS) {
641         if (needlock) {
642             UNLOCK_UBIK_CLIENT(aclient);
643         }
644         return UNOSERVERS;
645     }
646
647     code =
648         (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
649                   p14, p15, p16);
650     if (aclient->initializationState != origLevel) {
651         if (needlock) {
652             UNLOCK_UBIK_CLIENT(aclient);
653         }
654         return code;            /* somebody did a ubik_ClientInit */
655     }
656
657     /* what should I do in case of UNOQUORUM ? */
658     if (code < 0) {
659         aclient->states[*apos] |= CFLastFailed; /* network errors */
660     } else {
661         /* either misc ubik code, or misc application code or success. */
662         aclient->states[*apos] &= ~CFLastFailed;        /* operation worked */
663     }
664
665     (*apos)++;
666     if (needlock) {
667         UNLOCK_UBIK_CLIENT(aclient);
668     }
669     return code;
670 }
671
672 /* 
673  * call this instead of stub and we'll guarantee to find a host that's up.
674  * in the future, we should also put in a protocol to find the sync site
675  */
676 afs_int32
677 ubik_Call_New(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
678               p11, p12, p13, p14, p15, p16)
679      int (*aproc) ();
680      register struct ubik_client *aclient;
681      afs_int32 aflags;
682      long p1;
683      long p2;
684      long p3;
685      long p4;
686      long p5;
687      long p6;
688      long p7;
689      long p8;
690      long p9;
691      long p10;
692      long p11;
693      long p12;
694      long p13;
695      long p14;
696      long p15;
697      long p16;
698 {
699     afs_int32 code, rcode;
700     afs_int32 count;
701     afs_int32 temp;
702     int pass;
703     int stepBack;
704     short origLevel;
705
706     LOCK_UBIK_CLIENT(aclient);
707   restart:
708     rcode = UNOSERVERS;
709     origLevel = aclient->initializationState;
710
711     /* Do two passes. First pass only checks servers known running */
712     for (aflags |= UPUBIKONLY, pass = 0; pass < 2;
713          pass++, aflags &= ~UPUBIKONLY) {
714         stepBack = 0;
715         count = 0;
716         while (1) {
717             code =
718                 CallIter(aproc, aclient, aflags, &count, p1, p2, p3, p4, p5,
719                          p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16,
720                          NO_LOCK);
721             if (code && (aclient->initializationState != origLevel)) {
722                 goto restart;
723             }
724             if (code == UNOSERVERS) {
725                 break;
726             }
727             rcode = code;       /* remember code from last good call */
728
729             if (code == UNOTSYNC) {     /* means this requires a sync site */
730                 if (aclient->conns[3]) {        /* don't bother unless 4 or more srv */
731                     temp = try_GetSyncSite(aclient, count);
732                     if (aclient->initializationState != origLevel) {
733                         goto restart;   /* somebody did a ubik_ClientInit */
734                     }
735                     if ((temp >= 0) && ((temp > count) || (stepBack++ <= 2))) {
736                         count = temp;   /* generally try to make progress */
737                     }
738                 }
739             } else if ((code >= 0) && (code != UNOQUORUM)) {
740                 UNLOCK_UBIK_CLIENT(aclient);
741                 return code;    /* success or global error condition */
742             }
743         }
744     }
745     UNLOCK_UBIK_CLIENT(aclient);
746     return rcode;
747 }
748
749 /* 
750  * This is part of an iterator.  It doesn't handle finding sync sites
751  */
752 afs_int32
753 ubik_CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7, p8,
754               p9, p10, p11, p12, p13, p14, p15, p16)
755      int (*aproc) ();
756      register struct ubik_client *aclient;
757      afs_int32 aflags;
758      int *apos;
759      long p1;
760      long p2;
761      long p3;
762      long p4;
763      long p5;
764      long p6;
765      long p7;
766      long p8;
767      long p9;
768      long p10;
769      long p11;
770      long p12;
771      long p13;
772      long p14;
773      long p15;
774      long p16;
775 {
776     return CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7,
777                     p8, p9, p10, p11, p12, p13, p14, p15, p16, NEED_LOCK);
778 }