ubik-call-sucks-20060703
[openafs.git] / src / ubik / ubikclient.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #if defined(UKERNEL)
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #if defined(UKERNEL)
21 #include "afs/sysincludes.h"
22 #include "afsincludes.h"
23 #include "afs/stds.h"
24 #include "rx/xdr.h"
25 #include "rx/rx.h"
26 #include "afs/lock.h"
27 #include "afs/rxgen_consts.h"
28 #include "ubik.h"
29 #include "afs/pthread_glock.h"
30 #else /* defined(UKERNEL) */
31 #include <afs/stds.h>
32 #include <afs/pthread_glock.h>
33 #include <stdio.h>
34 #include <string.h>
35 #include <rx/xdr.h>
36 #include <rx/rx.h>
37 #include <lock.h>
38 #ifdef AFS_NT40_ENV
39 #include <winsock2.h>
40 #else
41 #include <netdb.h>
42 #include <netinet/in.h>
43 #endif
44 #include <afs/rxgen_consts.h>
45 #include "ubik.h"
46 #endif /* defined(UKERNEL) */
47
48
49 short ubik_initializationState; /* initial state is zero */
50
51
52 /*
53  * parse list for clients
54  */
55 int
56 ubik_ParseClientList(int argc, char **argv, afs_int32 * aothers)
57 {
58     register afs_int32 i;
59     register char *tp;
60     register struct hostent *th;
61     afs_int32 temp, counter;
62     int inServer;
63
64     inServer = 0;               /* haven't seen -servers yet */
65     counter = 0;
66     for (i = 1; i < argc; i++) {
67         /* look for -servers argument */
68         tp = argv[i];
69
70         if (inServer) {
71             if (*tp == '-')
72                 break;          /* done */
73             /* otherwise this is a new host name */
74             LOCK_GLOBAL_MUTEX;
75             th = gethostbyname(tp);
76             if (!th) {
77                 UNLOCK_GLOBAL_MUTEX;
78                 return UBADHOST;
79             }
80             memmove((void *)&temp, (const void *)th->h_addr,
81                     sizeof(afs_int32));
82             UNLOCK_GLOBAL_MUTEX;
83             if (counter++ >= MAXSERVERS)
84                 return UNHOSTS;
85             *aothers++ = temp;
86         } else {
87             /* haven't seen a -server yet */
88             if (!strcmp(tp, "-servers")) {
89                 inServer = 1;
90             }
91         }
92     }
93     if (!inServer) {
94         /* never saw a -server */
95         return UNOENT;
96     }
97     if (counter < MAXSERVERS)
98         *aothers++ = 0;         /* null terminate if room */
99     return 0;
100 }
101
102 #ifdef AFS_PTHREAD_ENV
103 #include <pthread.h>
104 #include <assert.h>
105
106 static pthread_once_t random_once = PTHREAD_ONCE_INIT;
107 static int called_afs_random_once;
108 static pthread_key_t random_number_key;
109
110 static void
111 afs_random_once(void)
112 {
113     assert(pthread_key_create(&random_number_key, NULL) == 0);
114     called_afs_random_once = 1;
115 }
116
117 #endif
118 /* 
119  * Random number generator and constants from KnuthV2 2d ed, p170
120  *
121  * Rules:
122  * X = (aX + c) % m
123  * m is a power of two 
124  * a % 8 is 5
125  * a is 0.73m  should be 0.01m .. 0.99m
126  * c is more or less immaterial.  1 or a is suggested.
127  *
128  * NB:  LOW ORDER BITS are not very random.  To get small random numbers,
129  *      treat result as <1, with implied binary point, and multiply by 
130  *      desired modulus.
131  * NB:  Has to be unsigned, since shifts on signed quantities may preserve
132  *      the sign bit.
133  * 
134  * In this case, m == 2^32, the mod operation is implicit. a == pi, which
135  * is used because it has some interesting characteristics (lacks any
136  * interesting bit-patterns).   
137  * 
138  */
139
140 /* 
141  * use time and pid to try to get some initial randomness.
142  */
143 #if !defined(UKERNEL)
144 #define ranstage(x)     (x)= (afs_uint32) (3141592621U*((afs_uint32)x)+1)
145
146 unsigned int
147 afs_random(void)
148 {
149 #ifdef AFS_PTHREAD_ENV
150     afs_uint32 state;
151
152     (called_afs_random_once || pthread_once(&random_once, afs_random_once));
153     state = (afs_uint32) pthread_getspecific(random_number_key);
154 #else
155     static afs_uint32 state = 0;
156 #endif
157
158     if (!state) {
159         int i;
160         state = time(0) + getpid();
161         for (i = 0; i < 15; i++) {
162             ranstage(state);
163         }
164     }
165
166     ranstage(state);
167 #ifdef AFS_PTHREAD_ENV
168     pthread_setspecific(random_number_key, (const void *)state);
169 #endif
170     return (state);
171
172 }
173
174 /*
175  * returns int 0..14 using the high bits of a pseudo-random number instead of
176  * the low bits, as the low bits are "less random" than the high ones...
177  * slight roundoff error exists, an excercise for the reader.
178  * need to multiply by something with lots of ones in it, so multiply by 
179  * 8 or 16 is right out.
180  */
181
182 static unsigned int
183 afs_randomMod15(void)
184 {
185     afs_uint32 temp;
186
187     temp = afs_random() >> 4;
188     temp = (temp * 15) >> 28;
189
190     return temp;
191 }
192 #endif /* !defined(UKERNEL) */
193
194 #ifdef abs
195 #undef abs
196 #endif /* abs */
197 #define abs(a) ((a) < 0 ? -1*(a) : (a))
198 int
199 ubik_ClientInit(register struct rx_connection **serverconns,
200                 struct ubik_client **aclient)
201 {
202     int i, j;
203     int count;
204     int offset;
205     register struct ubik_client *tc;
206
207     initialize_U_error_table();
208
209     if (*aclient) {             /* the application is doing a re-initialization */
210         LOCK_UBIK_CLIENT((*aclient));
211         /* this is an important defensive check */
212         if (!((*aclient)->initializationState)) {
213             UNLOCK_UBIK_CLIENT((*aclient));
214             return UREINITIALIZE;
215         }
216
217         /* release all existing connections */
218         for (tc = *aclient, i = 0; i < MAXSERVERS; i++) {
219             struct rx_connection *rxConn = ubik_GetRPCConn(tc, i);
220             if (rxConn == 0)
221                 break;
222 #ifdef AFS_PTHREAD_ENV
223             rx_ReleaseCachedConnection(rxConn);
224 #else
225             rx_DestroyConnection(rxConn);
226 #endif
227         }
228         UNLOCK_UBIK_CLIENT((*aclient));
229 #ifdef AFS_PTHREAD_ENV
230         if (pthread_mutex_destroy(&((*aclient)->cm)))
231             return UMUTEXDESTROY;
232 #endif
233     } else {
234         tc = (struct ubik_client *)malloc(sizeof(struct ubik_client));
235     }
236     if (tc == NULL)
237         return UNOMEM;
238     memset((void *)tc, 0, sizeof(*tc));
239 #ifdef AFS_PTHREAD_ENV
240     if (pthread_mutex_init(&(tc->cm), (const pthread_mutexattr_t *)0)) {
241         return UMUTEXINIT;
242     }
243 #endif
244     tc->initializationState = ++ubik_initializationState;
245
246     /* first count the # of server conns so we can randomize properly */
247     count = 0;
248     for (i = 0; i < MAXSERVERS; i++) {
249         if (serverconns[i] == (struct rx_connection *)0)
250             break;
251         count++;
252     }
253
254     /* here count is the # of servers we're actually passed in.  Compute
255      * offset, a number between 0..count-1, where we'll start copying from the
256      * client-provided array. */
257     for (i = 0; i < count; i++) {
258         offset = afs_randomMod15() % count;
259         for (j = abs(offset); j < 2 * count; j++) {
260             if (!tc->conns[abs(j % count)]) {
261                 tc->conns[abs(j % count)] = serverconns[i];
262                 break;
263             }
264         }
265     }
266
267     *aclient = tc;
268     return 0;
269 }
270
271 /* 
272  * ubik_ClientDestroy - destroys a ubik connection.  It calls rx to destroy the
273  * component rx connections, then frees the ubik connection structure.
274  */
275
276 afs_int32
277 ubik_ClientDestroy(struct ubik_client * aclient)
278 {
279     register int c;
280
281     if (aclient == 0)
282         return 0;
283     LOCK_UBIK_CLIENT(aclient);
284     for (c = 0; c < MAXSERVERS; c++) {
285         struct rx_connection *rxConn = ubik_GetRPCConn(aclient, c);
286         if (rxConn == 0)
287             break;
288 #ifdef AFS_PTHREAD_ENV
289         rx_ReleaseCachedConnection(rxConn);
290 #else
291         rx_DestroyConnection(rxConn);
292 #endif
293     }
294     aclient->initializationState = 0;   /* client in not initialized */
295     UNLOCK_UBIK_CLIENT(aclient);
296 #ifdef AFS_PTHREAD_ENV
297     pthread_mutex_destroy(&(aclient->cm));      /* ignore failure */
298 #endif
299     free(aclient);
300     return 0;
301 }
302
303 /*
304  * RefreshConn -- So that intermittent failures that cause connections to die
305  *     don't kill whole ubik connection, refresh them when the connection is in
306  *     error.
307  */
308
309 struct rx_connection *
310 ubik_RefreshConn(struct rx_connection *tc)
311 {
312     afs_uint32 host;
313     u_short port;
314     u_short service;
315     struct rx_securityClass *sc;
316     int si;
317     struct rx_connection *newTc;
318
319     host = rx_HostOf(rx_PeerOf(tc));
320     port = rx_PortOf(rx_PeerOf(tc));
321     service = rx_ServiceIdOf(tc);
322     sc = rx_SecurityObjectOf(tc);
323     si = rx_SecurityClassOf(tc);
324
325     /*
326      * destroy old one after creating new one so that refCount on security
327      * object cannot reach zero.
328      */
329     newTc = rx_NewConnection(host, port, service, sc, si);
330     rx_DestroyConnection(tc);
331     return newTc;
332 }
333
334 #ifdef AFS_PTHREAD_ENV
335
336 pthread_once_t ubik_client_once = PTHREAD_ONCE_INIT;
337 pthread_mutex_t ubik_client_mutex;
338 #define LOCK_UCLNT_CACHE \
339     assert(pthread_once(&ubik_client_once, ubik_client_init_mutex) == 0 && \
340            pthread_mutex_lock(&ubik_client_mutex)==0)
341 #define UNLOCK_UCLNT_CACHE assert(pthread_mutex_unlock(&ubik_client_mutex)==0)
342
343 void
344 ubik_client_init_mutex()
345 {
346     assert(pthread_mutex_init(&ubik_client_mutex, NULL) == 0);
347 }
348
349 #else
350
351 #define LOCK_UCLNT_CACHE
352 #define UNLOCK_UCLNT_CACHE
353
354 #endif
355
356 #define SYNCCOUNT 10
357 static int *calls_needsync[SYNCCOUNT];  /* proc calls that need the sync site */
358 static int synccount = 0;
359
360 /*
361  * call this instead of stub and we'll guarantee to find a host that's up.
362  * in the future, we should also put in a protocol to find the sync site
363  */
364 afs_int32
365 ubik_Call(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
366           p11, p12, p13, p14, p15, p16)
367      int (*aproc) ();
368      register struct ubik_client *aclient;
369      afs_int32 aflags;
370      long p1;
371      long p2;
372      long p3;
373      long p4;
374      long p5;
375      long p6;
376      long p7;
377      long p8;
378      long p9;
379      long p10;
380      long p11;
381      long p12;
382      long p13;
383      long p14;
384      long p15;
385      long p16;
386 {
387     afs_int32 rcode, code, newHost, thisHost, i, count;
388     int chaseCount, pass, needsync, inlist, j;
389     struct rx_connection *tc;
390     struct rx_peer *rxp;
391     short origLevel;
392
393     if (!aclient)
394         return UNOENT;
395     LOCK_UBIK_CLIENT(aclient);
396
397   restart:
398     origLevel = aclient->initializationState;
399     rcode = UNOSERVERS;
400     chaseCount = inlist = needsync = 0;
401
402     LOCK_UCLNT_CACHE;
403     for (j = 0; ((j < SYNCCOUNT) && calls_needsync[j]); j++) {
404         if (calls_needsync[j] == (int *)aproc) {
405             inlist = needsync = 1;
406             break;
407         }
408     }
409     UNLOCK_UCLNT_CACHE;
410     /* 
411      * First  pass, we try all servers that are up.
412      * Second pass, we try all servers.
413      */
414     for (pass = 0; pass < 2; pass++) {  /*p */
415         /* For each entry in our servers list */
416         for (count = 0;; count++) {     /*s */
417
418             if (needsync) {
419                 /* Need a sync site. Lets try to quickly find it */
420                 if (aclient->syncSite) {
421                     newHost = aclient->syncSite;        /* already in network order */
422                     aclient->syncSite = 0;      /* Will reset if it works */
423                 } else if (aclient->conns[3]) {
424                     /* If there are fewer than four db servers in a cell,
425                      * there's no point in making the GetSyncSite call.
426                      * At best, it's a wash. At worst, it results in more
427                      * RPCs than you would otherwise make.
428                      */
429                     tc = aclient->conns[count];
430                     if (tc && rx_ConnError(tc)) {
431                         aclient->conns[count] = tc = ubik_RefreshConn(tc);
432                     }
433                     if (!tc)
434                         break;
435                     code = VOTE_GetSyncSite(tc, &newHost);
436                     if (aclient->initializationState != origLevel)
437                         goto restart;   /* somebody did a ubik_ClientInit */
438                     if (code)
439                         newHost = 0;
440                     newHost = htonl(newHost);   /* convert to network order */
441                 } else {
442                     newHost = 0;
443                 }
444                 if (newHost) {
445                     /* position count at the appropriate slot in the client
446                      * structure and retry. If we can't find in slot, we'll
447                      * just continue through the whole list 
448                      */
449                     for (i = 0; i < MAXSERVERS && aclient->conns[i]; i++) {
450                         rxp = rx_PeerOf(aclient->conns[i]);
451                         thisHost = rx_HostOf(rxp);
452                         if (!thisHost)
453                             break;
454                         if (thisHost == newHost) {
455                             if (chaseCount++ > 2)
456                                 break;  /* avoid loop asking */
457                             count = i;  /* this index is the sync site */
458                             break;
459                         }
460                     }
461                 }
462             }
463             /*needsync */
464             tc = aclient->conns[count];
465             if (tc && rx_ConnError(tc)) {
466                 aclient->conns[count] = tc = ubik_RefreshConn(tc);
467             }
468             if (!tc)
469                 break;
470
471             if ((pass == 0) && (aclient->states[count] & CFLastFailed)) {
472                 continue;       /* this guy's down */
473             }
474
475             rcode =
476                 (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11,
477                           p12, p13, p14, p15, p16);
478             if (aclient->initializationState != origLevel) {
479                 /* somebody did a ubik_ClientInit */
480                 if (rcode)
481                     goto restart;       /* call failed */
482                 else
483                     goto done;  /* call suceeded */
484             }
485             if (rcode < 0) {    /* network errors */
486                 aclient->states[count] |= CFLastFailed; /* Mark serer down */
487             } else if (rcode == UNOTSYNC) {
488                 needsync = 1;
489             } else if (rcode != UNOQUORUM) {
490                 /* either misc ubik code, or misc appl code, or success. */
491                 aclient->states[count] &= ~CFLastFailed;        /* mark server up */
492                 goto done;      /* all done */
493             }
494         }                       /*s */
495     }                           /*p */
496
497   done:
498     if (needsync) {
499         if (!inlist) {          /* Remember proc call that needs sync site */
500             LOCK_UCLNT_CACHE;
501             calls_needsync[synccount % SYNCCOUNT] = (int *)aproc;
502             synccount++;
503             UNLOCK_UCLNT_CACHE;
504             inlist = 1;
505         }
506         if (!rcode) {           /* Remember the sync site - cmd successful */
507             rxp = rx_PeerOf(aclient->conns[count]);
508             aclient->syncSite = rx_HostOf(rxp);
509         }
510     }
511     UNLOCK_UBIK_CLIENT(aclient);
512     return rcode;
513 }
514
515
516
517 /*
518  * call this after getting back a UNOTSYNC 
519  * note that getting a UNOTSYNC error code back does *not* guarantee
520  * that there is a sync site yet elected.  However, if there is a sync
521  * site out there somewhere, and you're trying an operation that
522  * requires a sync site, ubik will return UNOTSYNC, indicating the
523  * operation won't work until you find a sync site
524  */
525 static int
526 try_GetSyncSite(register struct ubik_client *aclient, afs_int32 apos)
527 {
528     struct rx_peer *rxp;
529     afs_int32 code;
530     int i;
531     afs_int32 thisHost, newHost;
532     struct rx_connection *tc;
533     short origLevel;
534
535     origLevel = aclient->initializationState;
536
537     /* get this conn */
538     tc = aclient->conns[apos];
539     if (tc && rx_ConnError(tc)) {
540         aclient->conns[apos] = (tc = ubik_RefreshConn(tc));
541     }
542     if (!tc) {
543         return -1;
544     }
545
546     /* now see if we can find the sync site host */
547     code = VOTE_GetSyncSite(tc, &newHost);
548     if (aclient->initializationState != origLevel) {
549         return -1;              /* somebody did a ubik_ClientInit */
550     }
551
552     if (!code && newHost) {
553         newHost = htonl(newHost);       /* convert back to network order */
554
555         /*
556          * position count at the appropriate slot in the client
557          * structure and retry. If we can't find in slot, we'll just
558          * continue through the whole list
559          */
560         for (i = 0; i < MAXSERVERS; i++) {
561             rxp = rx_PeerOf(aclient->conns[i]);
562             thisHost = rx_HostOf(rxp);
563             if (!thisHost) {
564                 return -1;
565             } else if (thisHost == newHost) {
566                 return i;       /* we were told to use this one */
567             }
568         }
569     }
570     return -1;
571 }
572
573 /* 
574  * Create an internal version of ubik_CallIter that takes an additional
575  * parameter - to indicate whether the ubik client handle has already
576  * been locked.
577  */
578
579 #define NEED_LOCK 1
580 #define NO_LOCK 0
581
582 static afs_int32
583 CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7, p8, p9,
584          p10, p11, p12, p13, p14, p15, p16, needlock)
585      int (*aproc) ();
586      register struct ubik_client *aclient;
587      afs_int32 aflags;
588      int *apos;
589      long p1;
590      long p2;
591      long p3;
592      long p4;
593      long p5;
594      long p6;
595      long p7;
596      long p8;
597      long p9;
598      long p10;
599      long p11;
600      long p12;
601      long p13;
602      long p14;
603      long p15;
604      long p16;
605      int needlock;
606 {
607     register afs_int32 code;
608     struct rx_connection *tc;
609     short origLevel;
610
611     if (needlock) {
612         LOCK_UBIK_CLIENT(aclient);
613     }
614     origLevel = aclient->initializationState;
615
616     code = UNOSERVERS;
617
618     while (*apos < MAXSERVERS) {
619         /* tc is the next conn to try */
620         tc = aclient->conns[*apos];
621         if (!tc) {
622             if (needlock) {
623                 UNLOCK_UBIK_CLIENT(aclient);
624             }
625             return UNOSERVERS;
626         }
627
628         if (rx_ConnError(tc)) {
629             tc = ubik_RefreshConn(tc);
630             aclient->conns[*apos] = tc;
631         }
632
633         if ((aflags & UPUBIKONLY) && (aclient->states[*apos] & CFLastFailed)) {
634             (*apos)++;          /* try another one if this server is down */
635         } else {
636             break;              /* this is the desired path */
637         }
638     }
639     if (*apos >= MAXSERVERS) {
640         if (needlock) {
641             UNLOCK_UBIK_CLIENT(aclient);
642         }
643         return UNOSERVERS;
644     }
645
646     code =
647         (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
648                   p14, p15, p16);
649     if (aclient->initializationState != origLevel) {
650         if (needlock) {
651             UNLOCK_UBIK_CLIENT(aclient);
652         }
653         return code;            /* somebody did a ubik_ClientInit */
654     }
655
656     /* what should I do in case of UNOQUORUM ? */
657     if (code < 0) {
658         aclient->states[*apos] |= CFLastFailed; /* network errors */
659     } else {
660         /* either misc ubik code, or misc application code or success. */
661         aclient->states[*apos] &= ~CFLastFailed;        /* operation worked */
662     }
663
664     (*apos)++;
665     if (needlock) {
666         UNLOCK_UBIK_CLIENT(aclient);
667     }
668     return code;
669 }
670
671 /* 
672  * call this instead of stub and we'll guarantee to find a host that's up.
673  * in the future, we should also put in a protocol to find the sync site
674  */
675 afs_int32
676 ubik_Call_New(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10,
677               p11, p12, p13, p14, p15, p16)
678      int (*aproc) ();
679      register struct ubik_client *aclient;
680      afs_int32 aflags;
681      long p1;
682      long p2;
683      long p3;
684      long p4;
685      long p5;
686      long p6;
687      long p7;
688      long p8;
689      long p9;
690      long p10;
691      long p11;
692      long p12;
693      long p13;
694      long p14;
695      long p15;
696      long p16;
697 {
698     afs_int32 code, rcode;
699     afs_int32 count;
700     afs_int32 temp;
701     int pass;
702     int stepBack;
703     short origLevel;
704
705     LOCK_UBIK_CLIENT(aclient);
706   restart:
707     rcode = UNOSERVERS;
708     origLevel = aclient->initializationState;
709
710     /* Do two passes. First pass only checks servers known running */
711     for (aflags |= UPUBIKONLY, pass = 0; pass < 2;
712          pass++, aflags &= ~UPUBIKONLY) {
713         stepBack = 0;
714         count = 0;
715         while (1) {
716             code =
717                 CallIter(aproc, aclient, aflags, &count, p1, p2, p3, p4, p5,
718                          p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16,
719                          NO_LOCK);
720             if (code && (aclient->initializationState != origLevel)) {
721                 goto restart;
722             }
723             if (code == UNOSERVERS) {
724                 break;
725             }
726             rcode = code;       /* remember code from last good call */
727
728             if (code == UNOTSYNC) {     /* means this requires a sync site */
729                 if (aclient->conns[3]) {        /* don't bother unless 4 or more srv */
730                     temp = try_GetSyncSite(aclient, count);
731                     if (aclient->initializationState != origLevel) {
732                         goto restart;   /* somebody did a ubik_ClientInit */
733                     }
734                     if ((temp >= 0) && ((temp > count) || (stepBack++ <= 2))) {
735                         count = temp;   /* generally try to make progress */
736                     }
737                 }
738             } else if ((code >= 0) && (code != UNOQUORUM)) {
739                 UNLOCK_UBIK_CLIENT(aclient);
740                 return code;    /* success or global error condition */
741             }
742         }
743     }
744     UNLOCK_UBIK_CLIENT(aclient);
745     return rcode;
746 }
747
748 /* 
749  * This is part of an iterator.  It doesn't handle finding sync sites
750  */
751 afs_int32
752 ubik_CallIter(int (*aproc) (), struct ubik_client *aclient,
753                                afs_int32 aflags, int *apos, long p1, long p2,
754                                long p3, long p4, long p5, long p6, long p7,
755                                long p8, long p9, long p10, long p11, long p12,
756                                long p13, long p14, long p15, long p16)
757 {
758     return CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7,
759                     p8, p9, p10, p11, p12, p13, p14, p15, p16, NEED_LOCK);
760 }