30499db1d692e17641512ad21631773ddba8c3b1
[openafs.git] / src / ubik / ubikclient.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #if defined(UKERNEL)
11 #include "../afs/param.h"
12 #include "../afs/sysincludes.h"
13 #include "../afs/afsincludes.h"
14 #include "../afs/stds.h"
15 #include "../rx/xdr.h"
16 #include "../rx/rx.h"
17 #include "../afs/lock.h"
18 #include "../afs/rxgen_consts.h"
19 #include "../afs/ubik.h"
20 #include "../afs/pthread_glock.h"
21 #else /* defined(UKERNEL) */
22 #include <afs/param.h>
23 #include <afs/stds.h>
24 #include <afs/pthread_glock.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <lock.h>
30 #ifdef AFS_NT40_ENV
31 #include <winsock2.h>
32 #else
33 #include <netdb.h>
34 #include <netinet/in.h>
35 #endif
36 #include <afs/rxgen_consts.h>
37 #include "ubik.h"
38 #endif /* defined(UKERNEL) */
39
40
41 afs_int32 ubik_CallIter();
42 short ubik_initializationState; /* initial state is zero */
43
44
45 /*
46  * parse list for clients
47  */
48 int ubik_ParseClientList(
49   int argc,
50   char **argv,
51   afs_int32 *aothers)
52 {
53     register afs_int32 i;
54     register char *tp;
55     register struct hostent *th;
56     afs_int32 temp, counter;
57     int inServer;
58     
59     inServer = 0;       /* haven't seen -servers yet */
60     counter = 0;
61     for(i=1; i<argc; i++) {
62         /* look for -servers argument */
63         tp = argv[i];
64         
65         if (inServer) {
66             if (*tp == '-') break;  /* done */
67             /* otherwise this is a new host name */
68             LOCK_GLOBAL_MUTEX
69             th = gethostbyname(tp);
70             if (!th) {
71                 UNLOCK_GLOBAL_MUTEX
72                 return UBADHOST;
73             }
74             memmove((void *) &temp, (const void *) th->h_addr, sizeof(afs_int32));
75             UNLOCK_GLOBAL_MUTEX
76             if (counter++ >= MAXSERVERS) return UNHOSTS;
77             *aothers++ = temp;
78         }
79         else {
80             /* haven't seen a -server yet */
81             if (!strcmp(tp, "-servers")) {
82                 inServer = 1;
83             }
84         }
85     }
86     if (!inServer) {
87         /* never saw a -server */
88         return UNOENT;
89     }
90     if (counter < MAXSERVERS) *aothers++ = 0;    /* null terminate if room */
91     return 0;
92 }
93
94 #ifdef AFS_PTHREAD_ENV
95 #include <pthread.h>
96 #include <assert.h>
97
98 static pthread_once_t random_once = PTHREAD_ONCE_INIT;
99 static int called_afs_random_once;
100 static pthread_key_t random_number_key; 
101
102 static void afs_random_once(void)
103 {
104     assert(pthread_key_create(&random_number_key, NULL)==0);
105     called_afs_random_once = 1;
106 }
107
108 #endif
109 /* 
110  * Random number generator and constants from KnuthV2 2d ed, p170
111  *
112  * Rules:
113  * X = (aX + c) % m
114  * m is a power of two 
115  * a % 8 is 5
116  * a is 0.73m  should be 0.01m .. 0.99m
117  * c is more or less immaterial.  1 or a is suggested.
118  *
119  * NB:  LOW ORDER BITS are not very random.  To get small random numbers,
120  *      treat result as <1, with implied binary point, and multiply by 
121  *      desired modulus.
122  * NB:  Has to be unsigned, since shifts on signed quantities may preserve
123  *      the sign bit.
124  * 
125  * In this case, m == 2^32, the mod operation is implicit. a == pi, which
126  * is used because it has some interesting characteristics (lacks any
127  * interesting bit-patterns).   
128  * 
129  */
130
131 /* 
132  * use time and pid to try to get some initial randomness.
133  */
134 #if !defined(UKERNEL)
135 #define ranstage(x)     (x)= (afs_uint32) (3141592621*((afs_uint32)x)+1)
136
137 unsigned int afs_random(void)
138 {
139 #ifdef AFS_PTHREAD_ENV
140     afs_uint32 state;
141
142     (called_afs_random_once || pthread_once(&random_once, afs_random_once));
143     state = (afs_uint32) pthread_getspecific(random_number_key);
144 #else
145     static afs_uint32 state = 0;
146 #endif
147
148     if (!state) {
149         int i;
150         state = time(0) + getpid();
151         for (i=0;i<15;i++) {
152             ranstage(state);
153         }
154     }
155
156     ranstage(state);
157 #ifdef AFS_PTHREAD_ENV
158     pthread_setspecific(random_number_key, (const void *) state);
159 #endif
160     return (state);
161
162 }
163
164 /*
165  * returns int 0..14 using the high bits of a pseudo-random number instead of
166  * the low bits, as the low bits are "less random" than the high ones...
167  * slight roundoff error exists, an excercise for the reader.
168  * need to multiply by something with lots of ones in it, so multiply by 
169  * 8 or 16 is right out.
170  */
171
172 static unsigned int afs_randomMod15(void)
173 {
174     afs_uint32 temp;
175
176     temp = afs_random() >> 4;
177     temp = (temp *15) >> 28;
178
179     return temp;
180 }
181 #endif /* !defined(UKERNEL) */
182
183 #ifdef abs
184 #undef abs
185 #endif /* abs */
186 #define abs(a) ((a) < 0 ? -1*(a) : (a))
187 int ubik_ClientInit(
188   register struct rx_connection **serverconns,
189   struct ubik_client **aclient)
190 {
191     int i,j;
192     int count;
193     int offset;
194     register struct ubik_client *tc;
195
196     initialize_u_error_table();
197
198     if ( *aclient ) {   /* the application is doing a re-initialization*/
199         LOCK_UBIK_CLIENT((*aclient))
200         /* this is an important defensive check */
201         if ( ! ((*aclient)->initializationState) ) {
202             UNLOCK_UBIK_CLIENT((*aclient))
203             return UREINITIALIZE;
204         }
205
206         /* release all existing connections */
207         for (tc = *aclient, i=0; i<MAXSERVERS; i++) 
208         {
209             struct rx_connection *rxConn = ubik_GetRPCConn(tc,i);
210             if (rxConn == 0) break;
211 #ifdef AFS_PTHREAD_ENV
212             rx_ReleaseCachedConnection(rxConn);
213 #else
214             rx_DestroyConnection (rxConn);
215 #endif
216         }
217         UNLOCK_UBIK_CLIENT((*aclient))
218 #ifdef AFS_PTHREAD_ENV
219         if (pthread_mutex_destroy(&((*aclient)->cm))) return UMUTEXDESTROY;
220 #endif
221     } else {
222         tc = (struct ubik_client *) malloc(sizeof(struct ubik_client));
223     }
224     if (tc == NULL) return UNOMEM;
225     memset((void *) tc, 0, sizeof(*tc));
226 #ifdef AFS_PTHREAD_ENV
227     if (pthread_mutex_init(&(tc->cm), (const pthread_mutexattr_t*)0)) {
228         return UMUTEXINIT;
229     }
230 #endif
231     tc->initializationState = ++ubik_initializationState;
232
233     /* first count the # of server conns so we can randomize properly */
234     count = 0;
235     for(i=0;i<MAXSERVERS;i++) {
236         if (serverconns[i] == (struct rx_connection *) 0) break;
237         count++;
238     }
239
240     /* here count is the # of servers we're actually passed in.  Compute
241      * offset, a number between 0..count-1, where we'll start copying from the
242      * client-provided array. */
243     for (i=0; i< count; i++) {
244         offset = afs_randomMod15() % count;
245         for (j=abs(offset); j<2*count; j++) {
246             if (!tc->conns[abs(j%count)]) {
247                 tc->conns[abs(j%count)] = serverconns[i];
248                 break;
249             }
250         }
251     }
252
253     *aclient = tc;
254     return 0;
255 }
256
257 /* 
258  * ubik_ClientDestroy - destroys a ubik connection.  It calls rx to destroy the
259  * component rx connections, then frees the ubik connection structure.
260  */
261
262 afs_int32 ubik_ClientDestroy(struct ubik_client *aclient)
263 {
264     register int c;
265
266     if (aclient == 0) return 0;
267     LOCK_UBIK_CLIENT(aclient);
268     for (c=0; c<MAXSERVERS; c++) {
269         struct rx_connection *rxConn = ubik_GetRPCConn(aclient,c);
270         if (rxConn == 0) break;
271 #ifdef AFS_PTHREAD_ENV
272         rx_ReleaseCachedConnection(rxConn);
273 #else
274         rx_DestroyConnection (rxConn);
275 #endif
276     }
277     aclient->initializationState = 0; /* client in not initialized*/
278     UNLOCK_UBIK_CLIENT(aclient);
279 #ifdef AFS_PTHREAD_ENV
280     pthread_mutex_destroy(&(aclient->cm)); /* ignore failure */
281 #endif
282     free (aclient);
283     return 0;
284 }
285
286 /*
287  * RefreshConn -- So that intermittent failures that cause connections to die
288  *     don't kill whole ubik connection, refresh them when the connection is in
289  *     error.
290  */
291
292 static struct rx_connection *RefreshConn(struct rx_connection *tc)
293 {
294     afs_uint32 host;
295     u_short port;
296     u_short service;
297     struct rx_securityClass *sc;
298     int si;
299     struct rx_connection *newTc;
300
301     host = rx_HostOf(rx_PeerOf(tc));
302     port = rx_PortOf(rx_PeerOf(tc));
303     service = rx_ServiceIdOf(tc);
304     sc = rx_SecurityObjectOf(tc);
305     si = rx_SecurityClassOf(tc);
306
307     /*
308      * destroy old one after creating new one so that refCount on security
309      * object cannot reach zero.
310      */
311     newTc = rx_NewConnection (host, port, service, sc, si);
312     rx_DestroyConnection (tc);
313     return newTc;
314 }
315
316 #ifdef AFS_PTHREAD_ENV
317
318 pthread_once_t ubik_client_once = PTHREAD_ONCE_INIT;
319 pthread_mutex_t ubik_client_mutex;
320 #define LOCK_UCLNT_CACHE \
321     assert(pthread_once(&ubik_client_once, ubik_client_init_mutex) == 0 && \
322            pthread_mutex_lock(&ubik_client_mutex)==0);
323 #define UNLOCK_UCLNT_CACHE assert(pthread_mutex_unlock(&ubik_client_mutex)==0);
324
325 void ubik_client_init_mutex() {
326     assert(pthread_mutex_init(&ubik_client_mutex, NULL) == 0);
327 }
328
329 #else
330
331 #define LOCK_UCLNT_CACHE
332 #define UNLOCK_UCLNT_CACHE
333
334 #endif
335
336 #define SYNCCOUNT 10
337 static int *calls_needsync[SYNCCOUNT]; /* proc calls that need the sync site */
338 static int synccount=0;
339
340 /*
341  * call this instead of stub and we'll guarantee to find a host that's up.
342  * in the future, we should also put in a protocol to find the sync site
343  */
344 afs_int32 ubik_Call(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8, p9,
345                 p10, p11, p12, p13, p14, p15, p16)
346   int (*aproc)();
347   register struct ubik_client *aclient;
348   afs_int32 aflags;
349   long p1;
350   long p2;
351   long p3;
352   long p4;
353   long p5;
354   long p6;
355   long p7;
356   long p8;
357   long p9;
358   long p10;
359   long p11;
360   long p12;
361   long p13;
362   long p14;
363   long p15;
364   long p16;
365 {
366     afs_int32 rcode, code, newHost, thisHost, i, count;
367     int chaseCount, pass, needsync, inlist, j;
368     struct rx_connection *tc;
369     struct rx_peer *rxp;
370     short       origLevel ;
371
372     if (!aclient) return UNOENT;
373     LOCK_UBIK_CLIENT(aclient);
374
375 restart:
376     origLevel = aclient->initializationState;
377     rcode = UNOSERVERS;
378     chaseCount = inlist = needsync = 0;
379
380     LOCK_UCLNT_CACHE
381     for (j=0; ((j<SYNCCOUNT) && calls_needsync[j]); j++) {
382         if (calls_needsync[j] == (int *)aproc) {
383             inlist = needsync = 1;
384             break;
385         }
386     }
387     UNLOCK_UCLNT_CACHE
388
389     /* 
390      * First  pass, we try all servers that are up.
391      * Second pass, we try all servers.
392      */
393     for (pass=0; pass<2; pass++) { /*p*/
394         /* For each entry in our servers list */
395         for (count=0; ;count++) { /*s*/
396
397             if (needsync) {
398                 /* Need a sync site. Lets try to quickly find it */
399                 if (aclient->syncSite) {
400                     newHost = aclient->syncSite; /* already in network order */
401                     aclient->syncSite = 0;       /* Will reset if it works */
402                 } else if (aclient->conns[3]) {
403                     /* If there are fewer than four db servers in a cell,
404                     * there's no point in making the GetSyncSite call.
405                     * At best, it's a wash. At worst, it results in more
406                     * RPCs than you would otherwise make.
407                     */
408                     tc = aclient->conns[count];
409                     if (tc && rx_ConnError(tc)) {
410                       aclient->conns[count] = tc = RefreshConn(tc);
411                     }
412                     if (!tc) break;
413                     code = VOTE_GetSyncSite(tc, &newHost);
414                     if ( aclient->initializationState != origLevel)
415                         goto restart; /* somebody did a ubik_ClientInit */
416                     if (code) newHost = 0;
417                     newHost = htonl(newHost);    /* convert to network order */
418                 } else {
419                     newHost = 0;
420                 }
421                 if (newHost) {
422                     /* position count at the appropriate slot in the client
423                     * structure and retry. If we can't find in slot, we'll
424                     * just continue through the whole list 
425                     */
426                     for (i=0; i<MAXSERVERS && aclient->conns[i]; i++) {
427                         rxp = rx_PeerOf(aclient->conns[i]);
428                         thisHost = rx_HostOf(rxp);
429                         if (!thisHost) break;
430                         if (thisHost == newHost) {
431                             if (chaseCount++ > 2) break; /* avoid loop asking */
432                             count = i;         /* this index is the sync site */
433                             break;
434                         }
435                     }
436                 }
437             } /*needsync*/
438
439             tc = aclient->conns[count];
440             if (tc && rx_ConnError(tc)) {
441               aclient->conns[count] = tc = RefreshConn(tc);
442             }
443             if (!tc) break;
444
445             if ((pass == 0) && (aclient->states[count] & CFLastFailed)) {
446                 continue; /* this guy's down */
447             }
448
449             rcode = (*aproc)(tc, p1,  p2,  p3,  p4,  p5,  p6,  p7,  p8,  p9,
450                              p10, p11, p12, p13, p14, p15, p16);
451             if ( aclient->initializationState != origLevel) {   
452                 /* somebody did a ubik_ClientInit */
453                 if ( rcode ) goto restart; /* call failed */
454                 else    goto done;    /* call suceeded */
455             }
456             if (rcode < 0) {                            /* network errors */
457                 aclient->states[count] |= CFLastFailed;  /* Mark serer down */
458             }
459             else if (rcode == UNOTSYNC) {
460                 needsync = 1;
461             }
462             else if (rcode != UNOQUORUM) {
463                 /* either misc ubik code, or misc appl code, or success. */
464                 aclient->states[count] &= ~CFLastFailed;    /* mark server up */
465                 goto done;                                  /* all done */
466             }
467         } /*s*/
468     } /*p*/
469
470   done:
471     if (needsync) {
472         if (!inlist) {   /* Remember proc call that needs sync site */
473             LOCK_UCLNT_CACHE
474             calls_needsync[synccount % SYNCCOUNT] = (int *)aproc;
475             synccount++;
476             UNLOCK_UCLNT_CACHE
477             inlist = 1;
478         }
479         if (!rcode) {    /* Remember the sync site - cmd successful */
480             rxp = rx_PeerOf(aclient->conns[count]);
481             aclient->syncSite = rx_HostOf(rxp);
482         }
483     }
484     UNLOCK_UBIK_CLIENT(aclient);
485     return rcode;
486 }
487
488
489
490 /*
491  * call this after getting back a UNOTSYNC 
492  * note that getting a UNOTSYNC error code back does *not* guarantee
493  * that there is a sync site yet elected.  However, if there is a sync
494  * site out there somewhere, and you're trying an operation that
495  * requires a sync site, ubik will return UNOTSYNC, indicating the
496  * operation won't work until you find a sync site
497  */
498 static int try_GetSyncSite(register struct ubik_client *aclient, afs_int32 apos) {
499     struct rx_peer *rxp;
500     afs_int32 code;
501     int i;
502     afs_int32 thisHost, newHost;
503     struct rx_connection *tc;
504     short origLevel;
505
506     origLevel = aclient->initializationState;
507
508     /* get this conn */
509     tc = aclient->conns[apos];
510     if (tc && rx_ConnError (tc)) {
511         aclient->conns[apos] = (tc = RefreshConn (tc));
512     }
513     if (!tc) {
514         return -1;
515     }
516
517     /* now see if we can find the sync site host */
518     code = VOTE_GetSyncSite(tc, &newHost);
519     if ( aclient->initializationState != origLevel) {
520         return -1; /* somebody did a ubik_ClientInit */
521     }
522
523     if ( !code && newHost ) {
524         newHost = htonl(newHost); /* convert back to network order */
525
526         /*
527          * position count at the appropriate slot in the client
528          * structure and retry. If we can't find in slot, we'll just
529          * continue through the whole list
530          */
531         for(i=0;i<MAXSERVERS;i++) {
532             rxp = rx_PeerOf(aclient->conns[i]);
533             thisHost = rx_HostOf(rxp);
534             if (!thisHost) {
535                 return -1;
536             }
537             else if (thisHost == newHost) {
538                 return i;       /* we were told to use this one */
539             }
540         }
541     }
542     return -1;
543 }
544
545 /* 
546  * Create an internal version of ubik_CallIter that takes an additional
547  * parameter - to indicate whether the ubik client handle has already
548  * been locked.
549  */
550
551 #define NEED_LOCK 1
552 #define NO_LOCK 0
553
554 static afs_int32 CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6,
555                       p7, p8, p9, p10, p11, p12, p13, p14, p15, p16, needlock)
556   int (*aproc)();
557   register struct ubik_client *aclient;
558   afs_int32 aflags;
559   int *apos;
560   long p1;
561   long p2;
562   long p3;
563   long p4;
564   long p5;
565   long p6;
566   long p7;
567   long p8;
568   long p9;
569   long p10;
570   long p11;
571   long p12;
572   long p13;
573   long p14;
574   long p15;
575   long p16;
576   int needlock;
577 {
578     register afs_int32 code;
579     struct rx_connection *tc;
580     short origLevel;
581
582     if (needlock) {
583         LOCK_UBIK_CLIENT(aclient)
584     }
585     origLevel = aclient->initializationState;
586
587     code = UNOSERVERS;
588
589     while (*apos < MAXSERVERS)
590     {
591         /* tc is the next conn to try */
592         tc = aclient->conns[*apos];
593         if (!tc) {
594             if (needlock) {
595                 UNLOCK_UBIK_CLIENT(aclient)
596             }
597             return UNOSERVERS;
598         }
599
600         if (rx_ConnError (tc)) {
601             tc = RefreshConn (tc);
602             aclient->conns[*apos] = tc;
603         }
604
605         if ((aflags & UPUBIKONLY) && (aclient->states[*apos] & CFLastFailed)) {
606             (*apos)++;  /* try another one if this server is down */
607         }
608         else {
609             break;      /* this is the desired path */
610         }
611     }
612     if (*apos >= MAXSERVERS) {
613         if (needlock) {
614             UNLOCK_UBIK_CLIENT(aclient)
615         }
616         return UNOSERVERS;
617     }
618
619     code = (*aproc)(tc,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16);
620     if ( aclient->initializationState != origLevel) {
621         if (needlock) {
622             UNLOCK_UBIK_CLIENT(aclient)
623         }
624         return code; /* somebody did a ubik_ClientInit */
625     }
626
627     /* what should I do in case of UNOQUORUM ? */
628     if (code < 0) {
629         aclient->states[*apos] |= CFLastFailed;           /* network errors */
630     }
631     else {
632         /* either misc ubik code, or misc application code or success. */
633         aclient->states[*apos] &= ~CFLastFailed;          /* operation worked */
634     }
635
636     (*apos)++;
637     if (needlock) {
638         UNLOCK_UBIK_CLIENT(aclient)
639     }
640     return code;
641 }
642
643 /* 
644  * call this instead of stub and we'll guarantee to find a host that's up.
645  * in the future, we should also put in a protocol to find the sync site
646  */
647 afs_int32 ubik_Call_New(aproc, aclient, aflags, p1, p2, p3, p4, p5, p6, p7, p8,
648                     p9, p10, p11, p12, p13, p14, p15, p16)
649   int (*aproc)();
650   register struct ubik_client *aclient;
651   afs_int32 aflags;
652   long p1;
653   long p2;
654   long p3;
655   long p4;
656   long p5;
657   long p6;
658   long p7;
659   long p8;
660   long p9;
661   long p10;
662   long p11;
663   long p12;
664   long p13;
665   long p14;
666   long p15;
667   long p16;
668
669     afs_int32 code, rcode;
670     afs_int32 count;
671     afs_int32 temp;
672     int pass;
673     int stepBack;
674     short origLevel;
675
676     LOCK_UBIK_CLIENT(aclient)
677 restart:
678     rcode = UNOSERVERS;
679     origLevel = aclient->initializationState;
680
681     /* Do two passes. First pass only checks servers known running */
682     for (aflags |= UPUBIKONLY, pass=0; pass<2; pass++, aflags &= ~UPUBIKONLY) {
683         stepBack = 0;
684         count = 0;
685         while (1) {
686             code = CallIter(aproc, aclient, aflags, &count, p1,p2,p3,p4,
687                             p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,NO_LOCK);
688             if ( code && ( aclient->initializationState != origLevel)) {
689                 goto restart;
690             }
691             if (code == UNOSERVERS) {
692                 break;
693             }
694             rcode = code;        /* remember code from last good call */
695
696             if (code == UNOTSYNC) {     /* means this requires a sync site */
697                 if (aclient->conns[3]) { /* don't bother unless 4 or more srv */
698                     temp = try_GetSyncSite(aclient, count);
699                     if ( aclient->initializationState != origLevel) {
700                         goto restart; /* somebody did a ubik_ClientInit */
701                     }
702                     if ((temp >= 0) && ((temp > count) || (stepBack++ <= 2))) {
703                         count = temp;       /* generally try to make progress */
704                     }
705                 }
706             }
707             else if ((code >= 0) && (code != UNOQUORUM)) {
708                 UNLOCK_UBIK_CLIENT(aclient)
709                 return code;  /* success or global error condition */
710             }
711         }
712     }
713     UNLOCK_UBIK_CLIENT(aclient)
714     return rcode;
715 }
716
717 /* 
718  * This is part of an iterator.  It doesn't handle finding sync sites
719  */
720 afs_int32 ubik_CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7,
721                     p8, p9, p10, p11, p12, p13, p14, p15, p16)
722   int (*aproc)();
723   register struct ubik_client *aclient;
724   afs_int32 aflags;
725   int *apos;
726   long p1;
727   long p2;
728   long p3;
729   long p4;
730   long p5;
731   long p6;
732   long p7;
733   long p8;
734   long p9;
735   long p10;
736   long p11;
737   long p12;
738   long p13;
739   long p14;
740   long p15;
741   long p16;
742 {
743     return CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5,
744                     p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16,
745                     NEED_LOCK);
746 }