ubik: Introduce IndexOf()
[openafs.git] / src / ubik / ubikclient.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12 #include <afs/stds.h>
13
14 #include <roken.h>
15 #include <afs/opr.h>
16 #ifdef AFS_PTHREAD_ENV
17 # include <opr/lock.h>
18 #endif
19
20 #ifdef IGNORE_SOME_GCC_WARNINGS
21 # pragma GCC diagnostic warning "-Wstrict-prototypes"
22 #endif
23
24 #include <afs/pthread_glock.h>
25 #include <rx/xdr.h>
26 #include <rx/rx.h>
27 #include <lock.h>
28 #include <afs/rxgen_consts.h>
29 #define UBIK_LEGACY_CALLITER
30 #include "ubik.h"
31
32 short ubik_initializationState; /*!< initial state is zero */
33
34
35 /*!
36  * \brief Parse list for clients.
37  */
38 int
39 ubik_ParseClientList(int argc, char **argv, afs_uint32 * aothers)
40 {
41     afs_int32 i;
42     char *tp;
43     struct hostent *th;
44     afs_uint32 temp;
45     afs_int32 counter;
46     int inServer;
47
48     inServer = 0;               /* haven't seen -servers yet */
49     counter = 0;
50     for (i = 1; i < argc; i++) {
51         /* look for -servers argument */
52         tp = argv[i];
53
54         if (inServer) {
55             if (*tp == '-')
56                 break;          /* done */
57             /* otherwise this is a new host name */
58             LOCK_GLOBAL_MUTEX;
59             th = gethostbyname(tp);
60             if (!th) {
61                 UNLOCK_GLOBAL_MUTEX;
62                 return UBADHOST;
63             }
64             memmove((void *)&temp, (const void *)th->h_addr,
65                     sizeof(afs_int32));
66             UNLOCK_GLOBAL_MUTEX;
67             if (counter++ >= MAXSERVERS)
68                 return UNHOSTS;
69             *aothers++ = temp;
70         } else {
71             /* haven't seen a -server yet */
72             if (!strcmp(tp, "-servers")) {
73                 inServer = 1;
74             }
75         }
76     }
77     if (!inServer) {
78         /* never saw a -server */
79         return UNOENT;
80     }
81     if (counter < MAXSERVERS)
82         *aothers++ = 0;         /* null terminate if room */
83     return 0;
84 }
85
86 #ifdef AFS_PTHREAD_ENV
87 #include <pthread.h>
88
89 static pthread_once_t random_once = PTHREAD_ONCE_INIT;
90 static int called_afs_random_once;
91 static pthread_key_t random_number_key;
92
93 static void
94 afs_random_once(void)
95 {
96     opr_Verify(pthread_key_create(&random_number_key, NULL) == 0);
97     called_afs_random_once = 1;
98 }
99
100 #endif
101
102 /*!
103  * \brief use time and pid to try to get some initial randomness.
104  */
105 #define ranstage(x)     (x)= (afs_uint32) (3141592621U*((afs_uint32)x)+1)
106
107 /*!
108  * \brief Random number generator and constants from KnuthV2 2d ed, p170
109  *
110  * Rules: \n
111  * X = (aX + c) % m \n
112  * m is a power of two \n
113  * a % 8 is 5 \n
114  * a is 0.73m  should be 0.01m .. 0.99m \n
115  * c is more or less immaterial.  1 or a is suggested. \n
116  *
117  * NB:  LOW ORDER BITS are not very random.  To get small random numbers,
118  *      treat result as <1, with implied binary point, and multiply by
119  *      desired modulus.
120  *
121  * NB:  Has to be unsigned, since shifts on signed quantities may preserve
122  *      the sign bit.
123  *
124  * In this case, m == 2^32, the mod operation is implicit. a == pi, which
125  * is used because it has some interesting characteristics (lacks any
126  * interesting bit-patterns).
127  */
128 unsigned int
129 afs_random(void)
130 {
131 #ifdef AFS_PTHREAD_ENV
132     afs_uint32 state;
133
134     if (!called_afs_random_once)
135         pthread_once(&random_once, afs_random_once);
136
137     state = (uintptr_t) pthread_getspecific(random_number_key);
138 #else
139     static afs_uint32 state = 0;
140 #endif
141
142     if (!state) {
143         int i;
144         state = time(0) + getpid();
145         for (i = 0; i < 15; i++) {
146             ranstage(state);
147         }
148     }
149
150     ranstage(state);
151 #ifdef AFS_PTHREAD_ENV
152     pthread_setspecific(random_number_key, (const void *)(uintptr_t)state);
153 #endif
154     return (state);
155
156 }
157
158 /*!
159  * \brief Returns int 0..14 using the high bits of a pseudo-random number instead of
160  * the low bits, as the low bits are "less random" than the high ones...
161  *
162  * \todo Slight roundoff error exists, an excercise for the reader.
163  *
164  * Need to multiply by something with lots of ones in it, so multiply by
165  * 8 or 16 is right out.
166  */
167 static unsigned int
168 afs_randomMod15(void)
169 {
170     afs_uint32 temp;
171
172     temp = afs_random() >> 4;
173     temp = (temp * 15) >> 28;
174
175     return temp;
176 }
177
178 #ifdef abs
179 #undef abs
180 #endif /* abs */
181 #define abs(a) ((a) < 0 ? -1*(a) : (a))
182 int
183 ubik_ClientInit(struct rx_connection **serverconns,
184                 struct ubik_client **aclient)
185 {
186     int i, j;
187     int count;
188     int offset;
189     struct ubik_client *tc;
190
191     initialize_U_error_table();
192
193     if (*aclient) {             /* the application is doing a re-initialization */
194         LOCK_UBIK_CLIENT((*aclient));
195         /* this is an important defensive check */
196         if (!((*aclient)->initializationState)) {
197             UNLOCK_UBIK_CLIENT((*aclient));
198             return UREINITIALIZE;
199         }
200
201         /* release all existing connections */
202         for (tc = *aclient, i = 0; i < MAXSERVERS; i++) {
203             struct rx_connection *rxConn = ubik_GetRPCConn(tc, i);
204             if (rxConn == 0)
205                 break;
206 #ifdef AFS_PTHREAD_ENV
207             rx_ReleaseCachedConnection(rxConn);
208 #else
209             rx_DestroyConnection(rxConn);
210 #endif
211         }
212         UNLOCK_UBIK_CLIENT((*aclient));
213 #ifdef AFS_PTHREAD_ENV
214         if (pthread_mutex_destroy(&((*aclient)->cm)))
215             return UMUTEXDESTROY;
216 #endif
217     } else {
218         tc = malloc(sizeof(struct ubik_client));
219     }
220     if (tc == NULL)
221         return UNOMEM;
222     memset((void *)tc, 0, sizeof(*tc));
223 #ifdef AFS_PTHREAD_ENV
224     if (pthread_mutex_init(&(tc->cm), (const pthread_mutexattr_t *)0)) {
225         free(tc);
226         return UMUTEXINIT;
227     }
228 #endif
229     tc->initializationState = ++ubik_initializationState;
230
231     /* first count the # of server conns so we can randomize properly */
232     count = 0;
233     for (i = 0; i < MAXSERVERS; i++) {
234         if (serverconns[i] == (struct rx_connection *)0)
235             break;
236         count++;
237     }
238
239     /* here count is the # of servers we're actually passed in.  Compute
240      * offset, a number between 0..count-1, where we'll start copying from the
241      * client-provided array. */
242     for (i = 0; i < count; i++) {
243         offset = afs_randomMod15() % count;
244         for (j = abs(offset); j < 2 * count; j++) {
245             if (!tc->conns[abs(j % count)]) {
246                 tc->conns[abs(j % count)] = serverconns[i];
247                 break;
248             }
249         }
250     }
251
252     *aclient = tc;
253     return 0;
254 }
255
256 /*!
257  * \brief Destroy an ubik connection.
258  *
259  * It calls rx to destroy the component rx connections, then frees the ubik
260  * connection structure.
261  */
262 afs_int32
263 ubik_ClientDestroy(struct ubik_client * aclient)
264 {
265     int c;
266
267     if (aclient == 0)
268         return 0;
269     LOCK_UBIK_CLIENT(aclient);
270     for (c = 0; c < MAXSERVERS; c++) {
271         struct rx_connection *rxConn = ubik_GetRPCConn(aclient, c);
272         if (rxConn == 0)
273             break;
274 #ifdef AFS_PTHREAD_ENV
275         rx_ReleaseCachedConnection(rxConn);
276 #else
277         rx_DestroyConnection(rxConn);
278 #endif
279     }
280     aclient->initializationState = 0;   /* client in not initialized */
281     UNLOCK_UBIK_CLIENT(aclient);
282 #ifdef AFS_PTHREAD_ENV
283     pthread_mutex_destroy(&(aclient->cm));      /* ignore failure */
284 #endif
285     free(aclient);
286     return 0;
287 }
288
289 /*!
290  * \brief So that intermittent failures that cause connections to die
291  *     don't kill whole ubik connection, refresh them when the connection is in
292  *     error.
293  */
294 struct rx_connection *
295 ubik_RefreshConn(struct rx_connection *tc)
296 {
297     afs_uint32 host;
298     u_short port;
299     u_short service;
300     struct rx_securityClass *sc;
301     int si;
302     struct rx_connection *newTc;
303
304     host = rx_HostOf(rx_PeerOf(tc));
305     port = rx_PortOf(rx_PeerOf(tc));
306     service = rx_ServiceIdOf(tc);
307     sc = rx_SecurityObjectOf(tc);
308     si = rx_SecurityClassOf(tc);
309
310     /*
311      * destroy old one after creating new one so that refCount on security
312      * object cannot reach zero.
313      */
314     newTc = rx_NewConnection(host, port, service, sc, si);
315     rx_DestroyConnection(tc);
316     return newTc;
317 }
318
319 #ifdef AFS_PTHREAD_ENV
320
321 pthread_once_t ubik_client_once = PTHREAD_ONCE_INIT;
322 pthread_mutex_t ubik_client_mutex;
323 #define LOCK_UCLNT_CACHE do { \
324     opr_Verify(pthread_once(&ubik_client_once, ubik_client_init_mutex) == 0); \
325     MUTEX_ENTER(&ubik_client_mutex); \
326   } while (0)
327 #define UNLOCK_UCLNT_CACHE MUTEX_EXIT(&ubik_client_mutex)
328
329 void
330 ubik_client_init_mutex(void)
331 {
332     MUTEX_INIT(&ubik_client_mutex, "client init", MUTEX_DEFAULT, 0);
333 }
334
335 #else
336
337 #define LOCK_UCLNT_CACHE
338 #define UNLOCK_UCLNT_CACHE
339
340 #endif
341
342 #define SYNCCOUNT 10
343 static int *calls_needsync[SYNCCOUNT];  /* proc calls that need the sync site */
344 static int synccount = 0;
345
346
347
348
349 #define NEED_LOCK 1
350 #define NO_LOCK 0
351
352 /*!
353  * \brief Create an internal version of ubik_CallIter that takes an additional
354  * parameter - to indicate whether the ubik client handle has already
355  * been locked.
356  */
357 static afs_int32
358 CallIter(int (*aproc) (), struct ubik_client *aclient,
359          afs_int32 aflags, int *apos, long p1, long p2, long p3, long p4,
360          long p5, long p6, long p7, long p8, long p9, long p10, long p11,
361          long p12, long p13, long p14, long p15, long p16, int needlock)
362 {
363     afs_int32 code;
364     struct rx_connection *tc;
365     short origLevel;
366
367     if (needlock) {
368         LOCK_UBIK_CLIENT(aclient);
369     }
370     origLevel = aclient->initializationState;
371
372     code = UNOSERVERS;
373
374     while (*apos < MAXSERVERS) {
375         /* tc is the next conn to try */
376         tc = aclient->conns[*apos];
377         if (!tc)
378             goto errout;
379
380         if (rx_ConnError(tc)) {
381             tc = ubik_RefreshConn(tc);
382             aclient->conns[*apos] = tc;
383         }
384
385         if ((aflags & UPUBIKONLY) && (aclient->states[*apos] & CFLastFailed)) {
386             (*apos)++;          /* try another one if this server is down */
387         } else {
388             break;              /* this is the desired path */
389         }
390     }
391     if (*apos >= MAXSERVERS)
392         goto errout;
393
394     code =
395         (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
396                   p14, p15, p16);
397     if (aclient->initializationState != origLevel)
398         /* somebody did a ubik_ClientInit */
399         goto errout;
400
401     /* what should I do in case of UNOQUORUM ? */
402     if (code < 0) {
403         aclient->states[*apos] |= CFLastFailed; /* network errors */
404     } else {
405         /* either misc ubik code, or misc application code or success. */
406         aclient->states[*apos] &= ~CFLastFailed;        /* operation worked */
407     }
408
409     (*apos)++;
410 errout:
411     if (needlock) {
412         UNLOCK_UBIK_CLIENT(aclient);
413     }
414     return code;
415 }
416
417 /*!
418  * \brief This is part of an iterator.  It doesn't handle finding sync sites.
419  */
420 afs_int32
421 ubik_CallIter(int (*aproc) (), struct ubik_client *aclient,
422                                afs_int32 aflags, int *apos, long p1, long p2,
423                                long p3, long p4, long p5, long p6, long p7,
424                                long p8, long p9, long p10, long p11, long p12,
425                                long p13, long p14, long p15, long p16)
426 {
427     return CallIter(aproc, aclient, aflags, apos, p1, p2, p3, p4, p5, p6, p7,
428                     p8, p9, p10, p11, p12, p13, p14, p15, p16, NEED_LOCK);
429 }
430
431 /*!
432  * \brief Call this instead of stub and we'll guarantee to find a host that's up.
433  *
434  * \todo In the future, we should also put in a protocol to find the sync site.
435  */
436 afs_int32
437 ubik_Call_New(int (*aproc) (), struct ubik_client *aclient,
438               afs_int32 aflags, long p1, long p2, long p3, long p4, long p5,
439               long p6, long p7, long p8, long p9, long p10, long p11,
440               long p12, long p13, long p14, long p15, long p16)
441 {
442     afs_int32 code, rcode;
443     afs_int32 count;
444     int pass;
445     short origLevel;
446
447     LOCK_UBIK_CLIENT(aclient);
448   restart:
449     rcode = UNOSERVERS;
450     origLevel = aclient->initializationState;
451
452     /* Do two passes. First pass only checks servers known running */
453     for (aflags |= UPUBIKONLY, pass = 0; pass < 2;
454          pass++, aflags &= ~UPUBIKONLY) {
455         count = 0;
456         while (1) {
457             code =
458                 CallIter(aproc, aclient, aflags, &count, p1, p2, p3, p4, p5,
459                          p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16,
460                          NO_LOCK);
461             if (code && (aclient->initializationState != origLevel)) {
462                 goto restart;
463             }
464             if (code == UNOSERVERS) {
465                 break;
466             }
467             rcode = code;       /* remember code from last good call */
468
469             if ((code >= 0) && (code != UNOQUORUM) && (code != UNOTSYNC)) {
470                 UNLOCK_UBIK_CLIENT(aclient);
471                 return code;    /* success or global error condition */
472             }
473         }
474     }
475     UNLOCK_UBIK_CLIENT(aclient);
476     return rcode;
477 }
478
479 /**
480  * Find index of connection to ahost.
481  *
482  * @param[in]     aclient  client structure
483  * @param[in]     ahost    host to be found (IP address in net-byte order)
484  *
485  * @return index on success; -1 otherwise.
486  */
487 static int
488 IndexOf(struct ubik_client *aclient, int ahost)
489 {
490     int i, thisHost, index = -1;
491     struct rx_peer *rxp;
492
493     for (i = 0; i < MAXSERVERS && aclient->conns[i]; i++) {
494         rxp = rx_PeerOf(aclient->conns[i]);
495         thisHost = rx_HostOf(rxp);
496
497         if (!thisHost)
498             break;
499         if (thisHost == ahost) {
500             index = i;
501             break;
502         }
503     }
504     return index;
505 }
506
507 /*!
508  * call this instead of stub and we'll guarantee to find a host that's up.
509  *
510  * \todo In the future, we should also put in a protocol to find the sync site.
511  */
512 afs_int32
513 ubik_Call(int (*aproc) (), struct ubik_client *aclient,
514           afs_int32 aflags, long p1, long p2, long p3, long p4,
515           long p5, long p6, long p7, long p8, long p9, long p10,
516           long p11, long p12, long p13, long p14, long p15, long p16)
517 {
518     afs_int32 rcode, code, count;
519     int pass, needsync, inlist, j;
520     struct rx_connection *tc;
521     struct rx_peer *rxp;
522     short origLevel;
523
524     if (aflags & UBIK_CALL_NEW)
525         return ubik_Call_New(aproc, aclient, aflags, p1, p2, p3, p4,
526                              p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15,
527                              p16);
528
529     if (!aclient)
530         return UNOENT;
531     LOCK_UBIK_CLIENT(aclient);
532
533   restart:
534     origLevel = aclient->initializationState;
535     rcode = UNOSERVERS;
536     inlist = needsync = 0;
537
538     LOCK_UCLNT_CACHE;
539     for (j = 0; ((j < SYNCCOUNT) && calls_needsync[j]); j++) {
540         if (calls_needsync[j] == (int *)aproc) {
541             inlist = needsync = 1;
542             break;
543         }
544     }
545     UNLOCK_UCLNT_CACHE;
546     /*
547      * First  pass, we try all servers that are up.
548      * Second pass, we try all servers.
549      */
550     for (pass = 0; pass < 2; pass++) {  /*p */
551         /* For each entry in our servers list */
552         for (count = 0;; count++) {     /*s */
553
554             if (needsync) {
555                 /* Need a sync site. Lets try to quickly find it */
556                 if (aclient->syncSite) {
557                     /*
558                      * Position "count" at the appropriate slot in the client
559                      * structure and retry. If we can't find in slot, we'll just
560                      * continue through the whole list.
561                      */
562                     code = IndexOf(aclient, aclient->syncSite);
563                     aclient->syncSite = 0;
564                     if (code != -1)
565                         count = code;
566                 }
567             }
568             /*needsync */
569             tc = aclient->conns[count];
570             if (tc && rx_ConnError(tc)) {
571                 aclient->conns[count] = tc = ubik_RefreshConn(tc);
572             }
573             if (!tc)
574                 break;
575
576             if ((pass == 0) && (aclient->states[count] & CFLastFailed)) {
577                 continue;       /* this guy's down */
578             }
579
580             rcode =
581                 (*aproc) (tc, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11,
582                           p12, p13, p14, p15, p16);
583             if (aclient->initializationState != origLevel) {
584                 /* somebody did a ubik_ClientInit */
585                 if (rcode)
586                     goto restart;       /* call failed */
587                 else
588                     goto done;  /* call suceeded */
589             }
590             if (rcode < 0) {    /* network errors */
591                 aclient->states[count] |= CFLastFailed; /* Mark serer down */
592             } else if (rcode == UNOTSYNC) {
593                 needsync = 1;
594             } else if (rcode != UNOQUORUM) {
595                 /* either misc ubik code, or misc appl code, or success. */
596                 aclient->states[count] &= ~CFLastFailed;        /* mark server up */
597                 goto done;      /* all done */
598             }
599         }                       /*s */
600     }                           /*p */
601
602   done:
603     if (needsync) {
604         if (!inlist) {          /* Remember proc call that needs sync site */
605             LOCK_UCLNT_CACHE;
606             calls_needsync[synccount % SYNCCOUNT] = (int *)aproc;
607             synccount++;
608             UNLOCK_UCLNT_CACHE;
609             inlist = 1;
610         }
611         if (!rcode) {           /* Remember the sync site - cmd successful */
612             rxp = rx_PeerOf(aclient->conns[count]);
613             aclient->syncSite = rx_HostOf(rxp);
614         }
615     }
616     UNLOCK_UBIK_CLIENT(aclient);
617     return rcode;
618 }
619
620 afs_int32
621 ubik_CallRock(struct ubik_client *aclient, afs_int32 aflags,
622               ubik_callrock_func proc, void *rock)
623 {
624     afs_int32 rcode, code, _ucount;
625     int pass, needsync;
626     struct rx_connection *tc;
627     struct rx_peer *rxp;
628     short origLevel;
629
630     if (!aclient)
631         return UNOENT;
632     LOCK_UBIK_CLIENT(aclient);
633
634  restart:
635     origLevel = aclient->initializationState;
636     rcode = UNOSERVERS;
637     needsync = 0;
638
639     /*
640      * First  pass, we try all servers that are up.
641      * Second pass, we try all servers.
642      */
643     for (pass = 0; pass < 2; pass++) {
644         /* For each entry in our servers list */
645         for (_ucount = 0;; _ucount++) {
646             struct ubik_callrock_info info;
647             if (needsync) {
648                 /* Need a sync site. Lets try to quickly find it */
649                 if (aclient->syncSite) {
650                     /*
651                      * Position "_ucount" at the appropriate slot in the client
652                      * structure and retry. If we can't find in slot, we'll just
653                      * continue through the whole list.
654                      */
655                     code = IndexOf(aclient, aclient->syncSite);
656                     aclient->syncSite = 0;
657                     if (code != -1)
658                         _ucount = code;
659                 }
660             }
661             /*needsync */
662             tc = aclient->conns[_ucount];
663             if (tc && rx_ConnError(tc)) {
664                 aclient->conns[_ucount] = tc = ubik_RefreshConn(tc);
665             }
666             if (!tc)
667                 break;
668
669             if ((pass == 0) && (aclient->states[_ucount] & CFLastFailed)) {
670                 continue;       /* this guy's down */
671             }
672
673             memset(&info, 0, sizeof(info));
674             info.conn = tc;
675             rcode = (*proc)(&info, rock);
676
677             if (aclient->initializationState != origLevel) {
678                 /* somebody did a ubik_ClientInit */
679                 if (rcode)
680                     goto restart;       /* call failed */
681                 else
682                     goto done;  /* call suceeded */
683             }
684             if (rcode < 0) {    /* network errors */
685                 aclient->states[_ucount] |= CFLastFailed; /* Mark server down */
686             } else if (rcode == UNOTSYNC) {
687                 needsync = 1;
688             } else if (rcode != UNOQUORUM) {
689                 /* either misc ubik code, or misc appl code, or success. */
690                 aclient->states[_ucount] &= ~CFLastFailed;      /* mark server up*/
691                 goto done;      /* all done */
692             }
693         }
694     }
695
696  done:
697     if (needsync) {
698         if (!rcode) {           /* Remember the sync site - cmd successful */
699             rxp = rx_PeerOf(aclient->conns[_ucount]);
700             aclient->syncSite = rx_HostOf(rxp);
701         }
702     }
703     UNLOCK_UBIK_CLIENT(aclient);
704     return rcode;
705 }