ubik-prototype-fallout-20090316
[openafs.git] / src / ubik / ubikclient.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #if defined(UKERNEL)
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #if defined(UKERNEL)
21 #include "afs/sysincludes.h"
22 #include "afsincludes.h"
23 #include "afs/stds.h"
24 #include "rx/xdr.h"
25 #include "rx/rx.h"
26 #include "afs/lock.h"
27 #include "afs/rxgen_consts.h"
28 #define UBIK_LEGACY_CALLITER 1
29 #include "ubik.h"
30 #include "afs/pthread_glock.h"
31 #else /* defined(UKERNEL) */
32 #include <afs/stds.h>
33 #include <afs/pthread_glock.h>
34 #include <stdio.h>
35 #include <string.h>
36 #include <rx/xdr.h>
37 #include <rx/rx.h>
38 #include <lock.h>
39 #ifdef AFS_NT40_ENV
40 #include <winsock2.h>
41 #else
42 #include <unistd.h>
43 #include <netdb.h>
44 #include <netinet/in.h>
45 #endif
46 #include <afs/rxgen_consts.h>
47 #include "ubik.h"
48 #endif /* defined(UKERNEL) */
49
50
51 short ubik_initializationState; /*!< initial state is zero */
52
53
54 /*!
55  * \brief Parse list for clients.
56  */
57 int
58 ubik_ParseClientList(int argc, char **argv, afs_int32 * aothers)
59 {
60     register afs_int32 i;
61     register char *tp;
62     register struct hostent *th;
63     afs_int32 temp, counter;
64     int inServer;
65
66     inServer = 0;               /* haven't seen -servers yet */
67     counter = 0;
68     for (i = 1; i < argc; i++) {
69         /* look for -servers argument */
70         tp = argv[i];
71
72         if (inServer) {
73             if (*tp == '-')
74                 break;          /* done */
75             /* otherwise this is a new host name */
76             LOCK_GLOBAL_MUTEX;
77             th = gethostbyname(tp);
78             if (!th) {
79                 UNLOCK_GLOBAL_MUTEX;
80                 return UBADHOST;
81             }
82             memmove((void *)&temp, (const void *)th->h_addr,
83                     sizeof(afs_int32));
84             UNLOCK_GLOBAL_MUTEX;
85             if (counter++ >= MAXSERVERS)
86                 return UNHOSTS;
87             *aothers++ = temp;
88         } else {
89             /* haven't seen a -server yet */
90             if (!strcmp(tp, "-servers")) {
91                 inServer = 1;
92             }
93         }
94     }
95     if (!inServer) {
96         /* never saw a -server */
97         return UNOENT;
98     }
99     if (counter < MAXSERVERS)
100         *aothers++ = 0;         /* null terminate if room */
101     return 0;
102 }
103
104 #ifdef AFS_PTHREAD_ENV
105 #include <pthread.h>
106 #include <assert.h>
107
108 static pthread_once_t random_once = PTHREAD_ONCE_INIT;
109 static int called_afs_random_once;
110 static pthread_key_t random_number_key;
111
112 static void
113 afs_random_once(void)
114 {
115     assert(pthread_key_create(&random_number_key, NULL) == 0);
116     called_afs_random_once = 1;
117 }
118
119 #endif
120
121 #if !defined(UKERNEL)
122 /*! 
123  * \brief use time and pid to try to get some initial randomness.
124  */
125 #define ranstage(x)     (x)= (afs_uint32) (3141592621U*((afs_uint32)x)+1)
126
127 /*! 
128  * \brief Random number generator and constants from KnuthV2 2d ed, p170
129  *
130  * Rules: \n
131  * X = (aX + c) % m \n
132  * m is a power of two \n
133  * a % 8 is 5 \n
134  * a is 0.73m  should be 0.01m .. 0.99m \n
135  * c is more or less immaterial.  1 or a is suggested. \n
136  *
137  * NB:  LOW ORDER BITS are not very random.  To get small random numbers,
138  *      treat result as <1, with implied binary point, and multiply by 
139  *      desired modulus.
140  *
141  * NB:  Has to be unsigned, since shifts on signed quantities may preserve
142  *      the sign bit.
143  * 
144  * In this case, m == 2^32, the mod operation is implicit. a == pi, which
145  * is used because it has some interesting characteristics (lacks any
146  * interesting bit-patterns).   
147  */
148 unsigned int
149 afs_random(void)
150 {
151 #ifdef AFS_PTHREAD_ENV
152     afs_uint32 state;
153
154     (called_afs_random_once || pthread_once(&random_once, afs_random_once));
155     state = (afs_uint32) pthread_getspecific(random_number_key);
156 #else
157     static afs_uint32 state = 0;
158 #endif
159
160     if (!state) {
161         int i;
162         state = time(0) + getpid();
163         for (i = 0; i < 15; i++) {
164             ranstage(state);
165         }
166     }
167
168     ranstage(state);
169 #ifdef AFS_PTHREAD_ENV
170     pthread_setspecific(random_number_key, (const void *)state);
171 #endif
172     return (state);
173
174 }
175
176 /*!
177  * \brief Returns int 0..14 using the high bits of a pseudo-random number instead of
178  * the low bits, as the low bits are "less random" than the high ones...
179  * 
180  * \todo Slight roundoff error exists, an excercise for the reader.
181  *
182  * Need to multiply by something with lots of ones in it, so multiply by 
183  * 8 or 16 is right out.
184  */
185 static unsigned int
186 afs_randomMod15(void)
187 {
188     afs_uint32 temp;
189
190     temp = afs_random() >> 4;
191     temp = (temp * 15) >> 28;
192
193     return temp;
194 }
195 #endif /* !defined(UKERNEL) */
196
197 #ifdef abs
198 #undef abs
199 #endif /* abs */
200 #define abs(a) ((a) < 0 ? -1*(a) : (a))
201 int
202 ubik_ClientInit(register struct rx_connection **serverconns,
203                 struct ubik_client **aclient)
204 {
205     int i, j;
206     int count;
207     int offset;
208     register struct ubik_client *tc;
209
210     initialize_U_error_table();
211
212     if (*aclient) {             /* the application is doing a re-initialization */
213         LOCK_UBIK_CLIENT((*aclient));
214         /* this is an important defensive check */
215         if (!((*aclient)->initializationState)) {
216             UNLOCK_UBIK_CLIENT((*aclient));
217             return UREINITIALIZE;
218         }
219
220         /* release all existing connections */
221         for (tc = *aclient, i = 0; i < MAXSERVERS; i++) {
222             struct rx_connection *rxConn = ubik_GetRPCConn(tc, i);
223             if (rxConn == 0)
224                 break;
225 #ifdef AFS_PTHREAD_ENV
226             rx_ReleaseCachedConnection(rxConn);
227 #else
228             rx_DestroyConnection(rxConn);
229 #endif
230         }
231         UNLOCK_UBIK_CLIENT((*aclient));
232 #ifdef AFS_PTHREAD_ENV
233         if (pthread_mutex_destroy(&((*aclient)->cm)))
234             return UMUTEXDESTROY;
235 #endif
236     } else {
237         tc = (struct ubik_client *)malloc(sizeof(struct ubik_client));
238     }
239     if (tc == NULL)
240         return UNOMEM;
241     memset((void *)tc, 0, sizeof(*tc));
242 #ifdef AFS_PTHREAD_ENV
243     if (pthread_mutex_init(&(tc->cm), (const pthread_mutexattr_t *)0)) {
244         return UMUTEXINIT;
245     }
246 #endif
247     tc->initializationState = ++ubik_initializationState;
248
249     /* first count the # of server conns so we can randomize properly */
250     count = 0;
251     for (i = 0; i < MAXSERVERS; i++) {
252         if (serverconns[i] == (struct rx_connection *)0)
253             break;
254         count++;
255     }
256
257     /* here count is the # of servers we're actually passed in.  Compute
258      * offset, a number between 0..count-1, where we'll start copying from the
259      * client-provided array. */
260     for (i = 0; i < count; i++) {
261         offset = afs_randomMod15() % count;
262         for (j = abs(offset); j < 2 * count; j++) {
263             if (!tc->conns[abs(j % count)]) {
264                 tc->conns[abs(j % count)] = serverconns[i];
265                 break;
266             }
267         }
268     }
269
270     *aclient = tc;
271     return 0;
272 }
273
274 /*! 
275  * \brief Destroy an ubik connection.
276  *
277  * It calls rx to destroy the component rx connections, then frees the ubik
278  * connection structure.
279  */
280 afs_int32
281 ubik_ClientDestroy(struct ubik_client * aclient)
282 {
283     register int c;
284
285     if (aclient == 0)
286         return 0;
287     LOCK_UBIK_CLIENT(aclient);
288     for (c = 0; c < MAXSERVERS; c++) {
289         struct rx_connection *rxConn = ubik_GetRPCConn(aclient, c);
290         if (rxConn == 0)
291             break;
292 #ifdef AFS_PTHREAD_ENV
293         rx_ReleaseCachedConnection(rxConn);
294 #else
295         rx_DestroyConnection(rxConn);
296 #endif
297     }
298     aclient->initializationState = 0;   /* client in not initialized */
299     UNLOCK_UBIK_CLIENT(aclient);
300 #ifdef AFS_PTHREAD_ENV
301     pthread_mutex_destroy(&(aclient->cm));      /* ignore failure */
302 #endif
303     free(aclient);
304     return 0;
305 }
306
307 /*!
308  * \brief So that intermittent failures that cause connections to die
309  *     don't kill whole ubik connection, refresh them when the connection is in
310  *     error.
311  */
312 struct rx_connection *
313 ubik_RefreshConn(struct rx_connection *tc)
314 {
315     afs_uint32 host;
316     u_short port;
317     u_short service;
318     struct rx_securityClass *sc;
319     int si;
320     struct rx_connection *newTc;
321
322     host = rx_HostOf(rx_PeerOf(tc));
323     port = rx_PortOf(rx_PeerOf(tc));
324     service = rx_ServiceIdOf(tc);
325     sc = rx_SecurityObjectOf(tc);
326     si = rx_SecurityClassOf(tc);
327
328     /*
329      * destroy old one after creating new one so that refCount on security
330      * object cannot reach zero.
331      */
332     newTc = rx_NewConnection(host, port, service, sc, si);
333     rx_DestroyConnection(tc);
334     return newTc;
335 }
336
337 #ifdef AFS_PTHREAD_ENV
338
339 pthread_once_t ubik_client_once = PTHREAD_ONCE_INIT;
340 pthread_mutex_t ubik_client_mutex;
341 #define LOCK_UCLNT_CACHE \
342     assert(pthread_once(&ubik_client_once, ubik_client_init_mutex) == 0 && \
343            pthread_mutex_lock(&ubik_client_mutex)==0)
344 #define UNLOCK_UCLNT_CACHE assert(pthread_mutex_unlock(&ubik_client_mutex)==0)
345
346 void
347 ubik_client_init_mutex(void)
348 {
349     assert(pthread_mutex_init(&ubik_client_mutex, NULL) == 0);
350 }
351
352 #else
353
354 #define LOCK_UCLNT_CACHE
355 #define UNLOCK_UCLNT_CACHE
356
357 #endif
358
359 #define SYNCCOUNT 10
360 static int *calls_needsync[SYNCCOUNT];  /* proc calls that need the sync site */
361 static int synccount = 0;
362
363 /*!
364  * call this instead of stub and we'll guarantee to find a host that's up.
365  * 
366  * \todo In the future, we should also put in a protocol to find the sync site.
367  */
368 afs_int32
369 ubik_Call(int (*aproc) (), register struct ubik_client *aclient, 
370           afs_int32 aflags, long p1, long p2, long p3, long p4, 
371           long p5, long p6, long p7, long p8, long p9, long p10,
372           long p11, long p12, long p13, long p14, long p15, long p16)
373 {
374     afs_int32 rcode, code, newHost, thisHost, i, count;
375     int chaseCount, pass, needsync, inlist, j;
376     struct rx_connection *tc;
377     struct rx_peer *rxp;
378     short origLevel;
379
380     if (!aclient)
381         return UNOENT;
382     LOCK_UBIK_CLIENT(aclient);
383
384   restart:
385     origLevel = aclient->initializationState;
386     rcode = UNOSERVERS;
387     chaseCount = inlist = needsync = 0;
388
389     LOCK_UCLNT_CACHE;
390     for (j = 0; ((j < SYNCCOUNT) && calls_needsync[j]); j++) {
391         if (calls_needsync[j] == (int *)aproc) {
392             inlist = needsync = 1;
393             break;
394         }
395     }
396     UNLOCK_UCLNT_CACHE;
397     /* 
398      * First  pass, we try all servers that are up.
399      * Second pass, we try all servers.
400      */
401     for (pass = 0; pass < 2; pass++) {  /*p */
402         /* For each entry in our servers list */
403         for (count = 0;; count++) {     /*s */
404
405             if (needsync) {
406                 /* Need a sync site. Lets try to quickly find it */
407                 if (aclient->syncSite) {
408                     newHost = aclient->syncSite;        /* already in network order */
409                     aclient->syncSite = 0;      /* Will reset if it works */
410                 } else if (aclient->conns[3]) {
411                     /* If there are fewer than four db servers in a cell,
412                      * there's no point in making the GetSyncSite call.
413                      * At best, it's a wash. At worst, it results in more
414                      * RPCs than you would otherwise make.
415                      */
416                     tc = aclient->conns[count];
417                     if (tc && rx_ConnError(tc)) {
418                         aclient->conns[count] = tc = ubik_RefreshConn(tc);
419                     }
420                     if (!tc)
421                         break;
422                     code = VOTE_GetSyncSite(tc, &newHost);
423                     if (aclient->initializationState != origLevel)
424                         goto restart;   /* somebody did a ubik_ClientInit */
425                     if (code)
426                         newHost = 0;
427                     newHost = htonl(newHost);   /* convert to network order */
428                 } else {
429                     newHost = 0;
430                 }
431                 if (newHost) {
432                     /* position count at the appropriate slot in the client
433                      * structure and retry. If we can't find in slot, we'll
434                      * just continue through the whole list 
435                      */
436                     for (i = 0; i < MAXSERVERS && aclient->conns[i]; i++) {
437                         rxp = rx_PeerOf(aclient->conns[i]);
438                         thisHost = rx_HostOf(rxp);
439                         if (!thisHost)
440                             break;
441                         if (thisHost == newHost) {
442                             if (chaseCount++ > 2)
443                                 break;  /* avoid loop asking */
444                             count = i;  /* this index is the sync site */
445                             break;
446                         }
447                     }
448                 }
449             }
450             /*needsync */
451             tc = aclient->conns[count];
452             if (tc && rx_ConnError(tc)) {
453                 aclient->conns[count] = tc = ubik_RefreshConn(tc);
454             }
455             if (!tc)
456                 break;
457
458             if ((pass == 0) && (aclient->states[count] & CFLastFailed)) {
459                 continue;       /* this guy's down */
460             }
461
462             rcode =
463                 (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11,
464                           p12, p13, p14, p15, p16);
465             if (aclient->initializationState != origLevel) {
466                 /* somebody did a ubik_ClientInit */
467                 if (rcode)
468                     goto restart;       /* call failed */
469                 else
470                     goto done;  /* call suceeded */
471             }
472             if (rcode < 0) {    /* network errors */
473                 aclient->states[count] |= CFLastFailed; /* Mark serer down */
474             } else if (rcode == UNOTSYNC) {
475                 needsync = 1;
476             } else if (rcode != UNOQUORUM) {
477                 /* either misc ubik code, or misc appl code, or success. */
478                 aclient->states[count] &= ~CFLastFailed;        /* mark server up */
479                 goto done;      /* all done */
480             }
481         }                       /*s */
482     }                           /*p */
483
484   done:
485     if (needsync) {
486         if (!inlist) {          /* Remember proc call that needs sync site */
487             LOCK_UCLNT_CACHE;
488             calls_needsync[synccount % SYNCCOUNT] = (int *)aproc;
489             synccount++;
490             UNLOCK_UCLNT_CACHE;
491             inlist = 1;
492         }
493         if (!rcode) {           /* Remember the sync site - cmd successful */
494             rxp = rx_PeerOf(aclient->conns[count]);
495             aclient->syncSite = rx_HostOf(rxp);
496         }
497     }
498     UNLOCK_UBIK_CLIENT(aclient);
499     return rcode;
500 }
501
502
503
504 /*!
505  * \brief Call this after getting back a #UNOTSYNC.
506  *
507  * \note Getting a #UNOTSYNC error code back does \b not guarantee
508  * that there is a sync site yet elected.  However, if there is a sync
509  * site out there somewhere, and you're trying an operation that
510  * requires a sync site, ubik will return #UNOTSYNC, indicating the
511  * operation won't work until you find a sync site
512  */
513 static int
514 try_GetSyncSite(register struct ubik_client *aclient, afs_int32 apos)
515 {
516     struct rx_peer *rxp;
517     afs_int32 code;
518     int i;
519     afs_int32 thisHost, newHost;
520     struct rx_connection *tc;
521     short origLevel;
522
523     origLevel = aclient->initializationState;
524
525     /* get this conn */
526     tc = aclient->conns[apos];
527     if (tc && rx_ConnError(tc)) {
528         aclient->conns[apos] = (tc = ubik_RefreshConn(tc));
529     }
530     if (!tc) {
531         return -1;
532     }
533
534     /* now see if we can find the sync site host */
535     code = VOTE_GetSyncSite(tc, &newHost);
536     if (aclient->initializationState != origLevel) {
537         return -1;              /* somebody did a ubik_ClientInit */
538     }
539
540     if (!code && newHost) {
541         newHost = htonl(newHost);       /* convert back to network order */
542
543         /*
544          * position count at the appropriate slot in the client
545          * structure and retry. If we can't find in slot, we'll just
546          * continue through the whole list
547          */
548         for (i = 0; i < MAXSERVERS; i++) {
549             rxp = rx_PeerOf(aclient->conns[i]);
550             thisHost = rx_HostOf(rxp);
551             if (!thisHost) {
552                 return -1;
553             } else if (thisHost == newHost) {
554                 return i;       /* we were told to use this one */
555             }
556         }
557     }
558     return -1;
559 }
560
561 #define NEED_LOCK 1
562 #define NO_LOCK 0
563
564 /*! 
565  * \brief Create an internal version of ubik_CallIter that takes an additional
566  * parameter - to indicate whether the ubik client handle has already
567  * been locked.
568  */
569 static afs_int32
570 CallIter(int (*aproc) (), register struct ubik_client *aclient, 
571          afs_int32 aflags, int *apos, long p1, long p2, long p3, long p4, 
572          long p5, long p6, long p7, long p8, long p9, long p10, long p11, 
573          long p12, long p13, long p14, long p15, long p16, int needlock)
574 {
575     register afs_int32 code;
576     struct rx_connection *tc;
577     short origLevel;
578
579     if (needlock) {
580         LOCK_UBIK_CLIENT(aclient);
581     }
582     origLevel = aclient->initializationState;
583
584     code = UNOSERVERS;
585
586     while (*apos < MAXSERVERS) {
587         /* tc is the next conn to try */
588         tc = aclient->conns[*apos];
589         if (!tc) {
590             if (needlock) {
591                 UNLOCK_UBIK_CLIENT(aclient);
592             }
593             return UNOSERVERS;
594         }
595
596         if (rx_ConnError(tc)) {
597             tc = ubik_RefreshConn(tc);
598             aclient->conns[*apos] = tc;
599         }
600
601         if ((aflags & UPUBIKONLY) && (aclient->states[*apos] & CFLastFailed)) {
602             (*apos)++;          /* try another one if this server is down */
603         } else {
604             break;              /* this is the desired path */
605         }
606     }
607     if (*apos >= MAXSERVERS) {
608         if (needlock) {
609             UNLOCK_UBIK_CLIENT(aclient);
610         }
611         return UNOSERVERS;
612     }
613
614     code =
615         (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
616                   p14, p15, p16);
617     if (aclient->initializationState != origLevel) {
618         if (needlock) {
619             UNLOCK_UBIK_CLIENT(aclient);
620         }
621         return code;            /* somebody did a ubik_ClientInit */
622     }
623
624     /* what should I do in case of UNOQUORUM ? */
625     if (code < 0) {
626         aclient->states[*apos] |= CFLastFailed; /* network errors */
627     } else {
628         /* either misc ubik code, or misc application code or success. */
629         aclient->states[*apos] &= ~CFLastFailed;        /* operation worked */
630     }
631
632     (*apos)++;
633     if (needlock) {
634         UNLOCK_UBIK_CLIENT(aclient);
635     }
636     return code;
637 }
638
639 /*! 
640  * \brief Call this instead of stub and we'll guarantee to find a host that's up.
641  *
642  * \todo In the future, we should also put in a protocol to find the sync site.
643  */
644 afs_int32
645 ubik_Call_New(int (*aproc) (), register struct ubik_client *aclient, 
646               afs_int32 aflags, long p1, long p2, long p3, long p4, long p5, 
647               long p6, long p7, long p8, long p9, long p10, long p11, 
648               long p12, long p13, long p14, long p15, long p16)
649 {
650     afs_int32 code, rcode;
651     afs_int32 count;
652     afs_int32 temp;
653     int pass;
654     int stepBack;
655     short origLevel;
656
657     LOCK_UBIK_CLIENT(aclient);
658   restart:
659     rcode = UNOSERVERS;
660     origLevel = aclient->initializationState;
661
662     /* Do two passes. First pass only checks servers known running */
663     for (aflags |= UPUBIKONLY, pass = 0; pass < 2;
664          pass++, aflags &= ~UPUBIKONLY) {
665         stepBack = 0;
666         count = 0;
667         while (1) {
668             code =
669                 CallIter(aproc, aclient, aflags, &count, p1, p2, p3, p4, p5,
670                          p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16,
671                          NO_LOCK);
672             if (code && (aclient->initializationState != origLevel)) {
673                 goto restart;
674             }
675             if (code == UNOSERVERS) {
676                 break;
677             }
678             rcode = code;       /* remember code from last good call */
679
680             if (code == UNOTSYNC) {     /* means this requires a sync site */
681                 if (aclient->conns[3]) {        /* don't bother unless 4 or more srv */
682                     temp = try_GetSyncSite(aclient, count);
683                     if (aclient->initializationState != origLevel) {
684                         goto restart;   /* somebody did a ubik_ClientInit */
685                     }
686                     if ((temp >= 0) && ((temp > count) || (stepBack++ <= 2))) {
687                         count = temp;   /* generally try to make progress */
688                     }
689                 }
690             } else if ((code >= 0) && (code != UNOQUORUM)) {
691                 UNLOCK_UBIK_CLIENT(aclient);
692                 return code;    /* success or global error condition */
693             }
694         }
695     }
696     UNLOCK_UBIK_CLIENT(aclient);
697     return rcode;
698 }
699
700 /*!
701  * \brief This is part of an iterator.  It doesn't handle finding sync sites.
702  */
703 afs_int32
704 ubik_CallIter(int (*aproc) (), struct ubik_client *aclient,
705                                afs_int32 aflags, int *apos, long p1, long p2,
706                                long p3, long p4, long p5, long p6, long p7,
707                                long p8, long p9, long p10, long p11, long p12,
708                                long p13, long p14, long p15, long p16)
709 {
710     return CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7,
711                     p8, p9, p10, p11, p12, p13, p14, p15, p16, NEED_LOCK);
712 }