c73e78142de53ed2113469b0cc98cb056ce169fe
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <lock.h>
16 #include <rx/xdr.h>
17 #include <rx/rx.h>
18 #include <afs/cellconfig.h>
19
20 #define UBIK_INTERNALS
21 #include "ubik.h"
22 #include "ubik_int.h"
23
24 #include <lwp.h>   /* temporary hack by klm */
25
26 #define ERROR_EXIT(code) do { \
27     error = (code); \
28     goto error_exit; \
29 } while (0)
30
31 /*!
32  * \file
33  * This system is organized in a hierarchical set of related modules.  Modules
34  * at one level can only call modules at the same level or below.
35  *
36  * At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
37  * operating system primitives.
38  *
39  * At the next level (1) we have
40  *
41  * \li VOTER--The module responsible for casting votes when asked.  It is also
42  * responsible for determining whether this server should try to become
43  * a synchronization site.
44  * \li BEACONER--The module responsible for sending keep-alives out when a
45  * server is actually the sync site, or trying to become a sync site.
46  * \li DISK--The module responsible for representing atomic transactions
47  * on the local disk.  It maintains a new-value only log.
48  * \li LOCK--The module responsible for locking byte ranges in the database file.
49  *
50  * At the next level (2) we have
51  *
52  * \li RECOVERY--The module responsible for ensuring that all members of a quorum
53  * have the same up-to-date database after a new synchronization site is
54  * elected.  This module runs only on the synchronization site.
55  *
56  * At the next level (3) we have
57  *
58  * \li REMOTE--The module responsible for interpreting requests from the sync
59  * site and applying them to the database, after obtaining the appropriate
60  * locks.
61  *
62  * At the next level (4) we have
63  *
64  * \li UBIK--The module users call to perform operations on the database.
65  */
66
67
68 /* some globals */
69 afs_int32 ubik_quorum = 0;
70 struct ubik_dbase *ubik_dbase = 0;
71 struct ubik_stats ubik_stats;
72 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
73 afs_int32 ubik_epochTime = 0;
74 afs_int32 urecovery_state = 0;
75 int (*ubik_SyncWriterCacheProc) (void);
76 struct ubik_server *ubik_servers;
77 short ubik_callPortal;
78
79 /* These global variables were used to control the server security layers.
80  * They are retained for backwards compatibility with legacy callers.
81  *
82  * The ubik_SetServerSecurityProcs() interface should be used instead.
83  */
84
85 int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
86 void *ubik_SRXSecurityRock;
87 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
88 void *ubik_CheckRXSecurityRock;
89
90
91
92 static int BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
93                       struct ubik_trans **transPtr, int readAny);
94
95 static struct rx_securityClass **ubik_sc = NULL;
96 static void (*buildSecClassesProc)(void *, struct rx_securityClass ***,
97                                    afs_int32 *) = NULL;
98 static int (*checkSecurityProc)(void *, struct rx_call *) = NULL;
99 static void *securityRock = NULL;
100
101 #define CStampVersion       1   /* meaning set ts->version */
102
103 static_inline struct rx_connection *
104 Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
105 {
106     struct rx_connection *conn;
107
108     UBIK_ADDR_LOCK;
109     conn = as->disk_rxcid;
110
111 #ifdef AFS_PTHREAD_ENV
112     rx_GetConnection(conn);
113     UBIK_ADDR_UNLOCK;
114     DBRELE(atrans->dbase);
115 #else
116     UBIK_ADDR_UNLOCK;
117 #endif /* AFS_PTHREAD_ENV */
118
119     return conn;
120 }
121
122 static_inline void
123 Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
124 {
125 #ifdef AFS_PTHREAD_ENV
126     DBHOLD(atrans->dbase);
127     rx_PutConnection(aconn);
128 #endif /* AFS_PTHREAD_ENV */
129 }
130
131
132 /*
133  * Iterate over all servers.  Callers pass in *ts which is used to track
134  * the current server.
135  * - Returns 1 if there are no more servers
136  * - Returns 0 with conn set to the connection for the current server if
137  *   it's up and current
138  */
139 static int
140 ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
141                          struct rx_connection **conn, afs_int32 *rcode,
142                          afs_int32 *okcalls, afs_int32 code)
143 {
144     if (!*ts) {
145         /* Initial call - start iterating over servers */
146         *ts = ubik_servers;
147         *conn = NULL;
148         *rcode = 0;
149         *okcalls = 0;
150     } else {
151         if (*conn) {
152             Quorum_EndIO(atrans, *conn);
153             *conn = NULL;
154             if (code) {         /* failure */
155                 *rcode = code;
156                 UBIK_BEACON_LOCK;
157                 (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
158                 (*ts)->beaconSinceDown = 0;
159                 UBIK_BEACON_UNLOCK;
160                 (*ts)->currentDB = 0;
161                 urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
162             } else {            /* success */
163                 if (!(*ts)->isClone)
164                     (*okcalls)++;       /* count up how many worked */
165                 if (aflags & CStampVersion) {
166                     (*ts)->version = atrans->dbase->version;
167                 }
168             }
169         }
170         *ts = (*ts)->next;
171     }
172     if (!(*ts))
173         return 1;
174     UBIK_BEACON_LOCK;
175     if (!(*ts)->up || !(*ts)->currentDB) {
176         UBIK_BEACON_UNLOCK;
177         (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
178         return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
179     }
180     UBIK_BEACON_UNLOCK;
181     *conn = Quorum_StartIO(atrans, *ts);
182     return 0;
183 }
184
185 static int
186 ContactQuorum_rcode(int okcalls, afs_int32 rcode)
187 {
188     /*
189      * return 0 if we successfully contacted a quorum, otherwise return error code.
190      * We don't have to contact ourselves (that was done locally)
191      */
192     if (okcalls + 1 >= ubik_quorum)
193         return 0;
194     else
195         return rcode;
196 }
197
198 /*!
199  * \brief Perform an operation at a quorum, handling error conditions.
200  * \return 0 if all worked and a quorum was contacted successfully
201  * \return otherwise mark failing server as down and return #UERROR
202  *
203  * \note If any server misses an update, we must wait #BIGTIME seconds before
204  * allowing the transaction to commit, to ensure that the missing and
205  * possibly still functioning server times out and stops handing out old
206  * data.  This is done in the commit code, where we wait for a server marked
207  * down to have stayed down for #BIGTIME seconds before we allow a transaction
208  * to commit.  A server that fails but comes back up won't give out old data
209  * because it is sent the sync count along with the beacon message that
210  * marks it as \b really up (\p beaconSinceDown).
211  */
212 afs_int32
213 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
214                           struct ubik_trans *atrans, int aflags)
215 {
216     struct ubik_server *ts = NULL;
217     afs_int32 code = 0, rcode, okcalls;
218     struct rx_connection *conn;
219     int done;
220
221     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
222     while (!done) {
223         if (conn)
224             code = (*proc)(conn, &atrans->tid);
225         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
226     }
227     return ContactQuorum_rcode(okcalls, rcode);
228 }
229
230
231 afs_int32
232 ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
233                         afs_int32 position, afs_int32 length, afs_int32 type)
234 {
235     struct ubik_server *ts = NULL;
236     afs_int32 code = 0, rcode, okcalls;
237     struct rx_connection *conn;
238     int done;
239
240     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
241     while (!done) {
242         if (conn)
243             code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
244         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
245     }
246     return ContactQuorum_rcode(okcalls, rcode);
247 }
248
249
250 afs_int32
251 ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
252                          afs_int32 file, afs_int32 position, bulkdata *data)
253 {
254     struct ubik_server *ts = NULL;
255     afs_int32 code = 0, rcode, okcalls;
256     struct rx_connection *conn;
257     int done;
258
259     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
260     while (!done) {
261         if (conn)
262             code = DISK_Write(conn, &atrans->tid, file, position, data);
263         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
264     }
265     return ContactQuorum_rcode(okcalls, rcode);
266 }
267
268
269 afs_int32
270 ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
271                             afs_int32 file, afs_int32 length)
272 {
273     struct ubik_server *ts = NULL;
274     afs_int32 code = 0, rcode, okcalls;
275     struct rx_connection *conn;
276     int done;
277
278     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
279     while (!done) {
280         if (conn)
281             code = DISK_Truncate(conn, &atrans->tid, file, length);
282         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
283     }
284     return ContactQuorum_rcode(okcalls, rcode);
285 }
286
287
288 afs_int32
289 ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
290                           iovec_wrt * io_vector, iovec_buf *io_buffer)
291 {
292     struct ubik_server *ts = NULL;
293     afs_int32 code = 0, rcode, okcalls;
294     struct rx_connection *conn;
295     int done;
296
297     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
298     while (!done) {
299         if (conn) {
300             code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
301             if ((code <= -450) && (code > -500)) {
302                 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
303                  * Un-bulk the entries and do individual DISK_Write calls
304                  * instead of DISK_WriteV.
305                  */
306                 struct ubik_iovec *iovec =
307                         (struct ubik_iovec *)io_vector->iovec_wrt_val;
308                 char *iobuf = (char *)io_buffer->iovec_buf_val;
309                 bulkdata tcbs;
310                 afs_int32 i, offset;
311
312                 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
313                     /* Sanity check for going off end of buffer */
314                     if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
315                         code = UINTERNAL;
316                         break;
317                     }
318                     tcbs.bulkdata_len = iovec[i].length;
319                     tcbs.bulkdata_val = &iobuf[offset];
320                     code = DISK_Write(conn, &atrans->tid, iovec[i].file,
321                            iovec[i].position, &tcbs);
322                     if (code)
323                         break;
324                     offset += iovec[i].length;
325                 }
326             }
327         }
328         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
329     }
330     return ContactQuorum_rcode(okcalls, rcode);
331 }
332
333
334 afs_int32
335 ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
336                               ubik_version *OldVersion,
337                               ubik_version *NewVersion)
338 {
339     struct ubik_server *ts = NULL;
340     afs_int32 code = 0, rcode, okcalls;
341     struct rx_connection *conn;
342     int done;
343
344     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
345     while (!done) {
346         if (conn)
347             code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
348         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
349     }
350     return ContactQuorum_rcode(okcalls, rcode);
351 }
352
353 #if defined(AFS_PTHREAD_ENV)
354 static int
355 ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
356     osi_Assert(pthread_attr_init(tattr) == 0);
357     osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
358     osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
359     return 0;
360 }
361 #endif
362
363 /*!
364  * \brief This routine initializes the ubik system for a set of servers.
365  * \return 0 for success, or an error code on failure.
366  * \param serverList set of servers specified; nServers gives the number of entries in this array.
367  * \param pathName provides an initial prefix used for naming storage files used by this system.
368  * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.
369  *
370  * \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
371  *
372  * \warning The host named by myHost should not also be listed in serverList.
373  *
374  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
375  */
376 int
377 ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
378                       struct afsconf_cell *info, char clones[],
379                       afs_uint32 serverList[], const char *pathName,
380                       struct ubik_dbase **dbase)
381 {
382     struct ubik_dbase *tdb;
383     afs_int32 code;
384 #ifdef AFS_PTHREAD_ENV
385     pthread_t rxServerThread;        /* pthread variables */
386     pthread_t ubeacon_InteractThread;
387     pthread_t urecovery_InteractThread;
388     pthread_attr_t rxServer_tattr;
389     pthread_attr_t ubeacon_Interact_tattr;
390     pthread_attr_t urecovery_Interact_tattr;
391 #else
392     PROCESS junk;
393     extern int rx_stackSize;
394 #endif
395
396     afs_int32 secIndex;
397     struct rx_securityClass *secClass;
398     int numClasses;
399
400     struct rx_service *tservice;
401
402     initialize_U_error_table();
403
404     tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
405     tdb->pathName = (char *)malloc(strlen(pathName) + 1);
406     strcpy(tdb->pathName, pathName);
407     tdb->activeTrans = (struct ubik_trans *)0;
408     memset(&tdb->version, 0, sizeof(struct ubik_version));
409     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
410 #ifdef AFS_PTHREAD_ENV
411     MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
412     MUTEX_INIT(&beacon_globals.beacon_lock, "beacon lock", MUTEX_DEFAULT, 0);
413     MUTEX_INIT(&vote_globals.vote_lock, "vote lock", MUTEX_DEFAULT, 0);
414     MUTEX_INIT(&addr_globals.addr_lock, "address lock", MUTEX_DEFAULT, 0);
415 #else
416     Lock_Init(&tdb->versionLock);
417 #endif
418     Lock_Init(&tdb->cache_lock);
419     tdb->flags = 0;
420     tdb->read = uphys_read;
421     tdb->write = uphys_write;
422     tdb->truncate = uphys_truncate;
423     tdb->open = uphys_invalidate;       /* this function isn't used any more */
424     tdb->sync = uphys_sync;
425     tdb->stat = uphys_stat;
426     tdb->getlabel = uphys_getlabel;
427     tdb->setlabel = uphys_setlabel;
428     tdb->getnfiles = uphys_getnfiles;
429     tdb->readers = 0;
430     tdb->tidCounter = tdb->writeTidCounter = 0;
431     *dbase = tdb;
432     ubik_dbase = tdb;           /* for now, only one db per server; can fix later when we have names for the other dbases */
433
434 #ifdef AFS_PTHREAD_ENV
435     CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
436     CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
437 #endif /* AFS_PTHREAD_ENV */
438
439     /* initialize RX */
440
441     /* the following call is idempotent so when/if it got called earlier,
442      * by whatever called us, it doesn't really matter -- klm */
443     code = rx_Init(myPort);
444     if (code < 0)
445         return code;
446
447     udisk_Init(ubik_nBuffers);
448     ulock_Init();
449
450     code = uvote_Init();
451     if (code)
452         return code;
453     code = urecovery_Initialize(tdb);
454     if (code)
455         return code;
456     if (info)
457         code = ubeacon_InitServerListByInfo(myHost, info, clones);
458     else
459         code = ubeacon_InitServerList(myHost, serverList);
460     if (code)
461         return code;
462
463     ubik_callPortal = myPort;
464     /* try to get an additional security object */
465     if (buildSecClassesProc == NULL) {
466         numClasses = 3;
467         ubik_sc = calloc(numClasses, sizeof(struct rx_securityClass *));
468         ubik_sc[0] = rxnull_NewServerSecurityObject();
469         if (ubik_SRXSecurityProc) {
470             code = (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock,
471                                             &secClass,
472                                             &secIndex);
473             if (code == 0) {
474                  ubik_sc[secIndex] = secClass;
475             }
476         }
477     } else {
478         (*buildSecClassesProc) (securityRock, &ubik_sc, &numClasses);
479     }
480     /* for backwards compat this should keep working as it does now
481        and not host bind */
482
483     tservice =
484         rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, numClasses,
485                       VOTE_ExecuteRequest);
486     if (tservice == (struct rx_service *)0) {
487         ubik_dprint("Could not create VOTE rx service!\n");
488         return -1;
489     }
490     rx_SetMinProcs(tservice, 2);
491     rx_SetMaxProcs(tservice, 3);
492
493     tservice =
494         rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, numClasses,
495                       DISK_ExecuteRequest);
496     if (tservice == (struct rx_service *)0) {
497         ubik_dprint("Could not create DISK rx service!\n");
498         return -1;
499     }
500     rx_SetMinProcs(tservice, 2);
501     rx_SetMaxProcs(tservice, 3);
502
503     /* start an rx_ServerProc to handle incoming RPC's in particular the
504      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
505      * the "steplock" problem in ubik initialization. Defect 11037.
506      */
507 #ifdef AFS_PTHREAD_ENV
508     ubik_thread_create(&rxServer_tattr, &rxServerThread, (void *)rx_ServerProc);
509 #else
510     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
511               NULL, "rx_ServerProc", &junk);
512 #endif
513
514     /* now start up async processes */
515 #ifdef AFS_PTHREAD_ENV
516     ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
517                 (void *)ubeacon_Interact);
518 #else
519     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
520                              LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
521                              &junk);
522     if (code)
523         return code;
524 #endif
525
526 #ifdef AFS_PTHREAD_ENV
527     ubik_thread_create(&urecovery_Interact_tattr, &urecovery_InteractThread,
528                 (void *)urecovery_Interact);
529     return 0;  /* is this correct?  - klm */
530 #else
531     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
532                              LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
533                              &junk);
534     return code;
535 #endif
536
537 }
538
539 /*!
540  * \see ubik_ServerInitCommon()
541  */
542 int
543 ubik_ServerInitByInfo(afs_uint32 myHost, short myPort,
544                       struct afsconf_cell *info, char clones[],
545                       const char *pathName, struct ubik_dbase **dbase)
546 {
547     afs_int32 code;
548
549     code =
550         ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
551                               dbase);
552     return code;
553 }
554
555 /*!
556  * \see ubik_ServerInitCommon()
557  */
558 int
559 ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
560                 const char *pathName, struct ubik_dbase **dbase)
561 {
562     afs_int32 code;
563
564     code =
565         ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
566                               serverList, pathName, dbase);
567     return code;
568 }
569
570 /*!
571  * \brief This routine begins a read or write transaction on the transaction
572  * identified by transPtr, in the dbase named by dbase.
573  *
574  * An open mode of ubik_READTRANS identifies this as a read transaction,
575  * while a mode of ubik_WRITETRANS identifies this as a write transaction.
576  * transPtr is set to the returned transaction control block.
577  * The readAny flag is set to 0 or 1 or 2 by the wrapper functions
578  * ubik_BeginTrans() or ubik_BeginTransReadAny() or
579  * ubik_BeginTransReadAnyWrite() below.
580  *
581  * \note We can only begin transaction when we have an up-to-date database.
582  */
583 static int
584 BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
585            struct ubik_trans **transPtr, int readAny)
586 {
587     struct ubik_trans *jt;
588     struct ubik_trans *tt;
589     afs_int32 code;
590
591     if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
592         /* it's not safe to use ubik_BeginTransReadAnyWrite without a
593          * cache-syncing function; fall back to ubik_BeginTransReadAny,
594          * which is safe but slower */
595         ubik_print("ubik_BeginTransReadAnyWrite called, but "
596                    "ubik_SyncWriterCacheProc not set; pretending "
597                    "ubik_BeginTransReadAny was called instead\n");
598         readAny = 1;
599     }
600
601     if ((transMode != UBIK_READTRANS) && readAny)
602         return UBADTYPE;
603     DBHOLD(dbase);
604     if (urecovery_AllBetter(dbase, readAny) == 0) {
605         DBRELE(dbase);
606         return UNOQUORUM;
607     }
608     /* otherwise we have a quorum, use it */
609
610     /* make sure that at most one write transaction occurs at any one time.  This
611      * has nothing to do with transaction locking; that's enforced by the lock package.  However,
612      * we can't even handle two non-conflicting writes, since our log and recovery modules
613      * don't know how to restore one without possibly picking up some data from the other. */
614     if (transMode == UBIK_WRITETRANS) {
615         /* if we're writing already, wait */
616         while (dbase->flags & DBWRITING) {
617 #ifdef AFS_PTHREAD_ENV
618             CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
619 #else
620             DBRELE(dbase);
621             LWP_WaitProcess(&dbase->flags);
622             DBHOLD(dbase);
623 #endif
624         }
625
626         if (!ubeacon_AmSyncSite()) {
627             DBRELE(dbase);
628             return UNOTSYNC;
629         }
630     }
631
632     /* create the transaction */
633     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
634     tt = jt;                    /* move to a register */
635     if (code || tt == (struct ubik_trans *)NULL) {
636         DBRELE(dbase);
637         return code;
638     }
639     if (readAny) {
640         tt->flags |= TRREADANY;
641         if (readAny > 1) {
642             tt->flags |= TRREADWRITE;
643         }
644     }
645     /* label trans and dbase with new tid */
646     tt->tid.epoch = ubik_epochTime;
647     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
648     tt->tid.counter = (dbase->tidCounter += 2);
649
650     if (transMode == UBIK_WRITETRANS) {
651         /* for a write trans, we have to keep track of the write tid counter too */
652         dbase->writeTidCounter = tt->tid.counter;
653
654         /* next try to start transaction on appropriate number of machines */
655         code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
656         if (code) {
657             /* we must abort the operation */
658             udisk_abort(tt);
659             ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
660             udisk_end(tt);
661             DBRELE(dbase);
662             return code;
663         }
664     }
665
666     *transPtr = tt;
667     DBRELE(dbase);
668     return 0;
669 }
670
671 /*!
672  * \see BeginTrans()
673  */
674 int
675 ubik_BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
676                 struct ubik_trans **transPtr)
677 {
678     return BeginTrans(dbase, transMode, transPtr, 0);
679 }
680
681 /*!
682  * \see BeginTrans()
683  */
684 int
685 ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
686                        struct ubik_trans **transPtr)
687 {
688     return BeginTrans(dbase, transMode, transPtr, 1);
689 }
690
691 /*!
692  * \see BeginTrans()
693  */
694 int
695 ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
696                             struct ubik_trans **transPtr)
697 {
698     return BeginTrans(dbase, transMode, transPtr, 2);
699 }
700
701 /*!
702  * \brief This routine ends a read or write transaction by aborting it.
703  */
704 int
705 ubik_AbortTrans(struct ubik_trans *transPtr)
706 {
707     afs_int32 code;
708     afs_int32 code2;
709     struct ubik_dbase *dbase;
710
711     dbase = transPtr->dbase;
712
713     if (transPtr->flags & TRCACHELOCKED) {
714         ReleaseReadLock(&dbase->cache_lock);
715         transPtr->flags &= ~TRCACHELOCKED;
716     }
717
718     ObtainWriteLock(&dbase->cache_lock);
719
720     DBHOLD(dbase);
721     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
722
723     ReleaseWriteLock(&dbase->cache_lock);
724
725     /* see if we're still up-to-date */
726     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
727         udisk_abort(transPtr);
728         udisk_end(transPtr);
729         DBRELE(dbase);
730         return UNOQUORUM;
731     }
732
733     if (transPtr->type == UBIK_READTRANS) {
734         code = udisk_abort(transPtr);
735         udisk_end(transPtr);
736         DBRELE(dbase);
737         return code;
738     }
739
740     /* below here, we know we're doing a write transaction */
741     if (!ubeacon_AmSyncSite()) {
742         udisk_abort(transPtr);
743         udisk_end(transPtr);
744         DBRELE(dbase);
745         return UNOTSYNC;
746     }
747
748     /* now it is safe to try remote abort */
749     code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
750     code2 = udisk_abort(transPtr);
751     udisk_end(transPtr);
752     DBRELE(dbase);
753     return (code ? code : code2);
754 }
755
756 static void
757 WritebackApplicationCache(struct ubik_dbase *dbase)
758 {
759     int code = 0;
760     if (ubik_SyncWriterCacheProc) {
761         code = ubik_SyncWriterCacheProc();
762     }
763     if (code) {
764         /* we failed to sync the local cache, so just invalidate the cache;
765          * we'll try to read the cache in again on the next read */
766         memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
767     } else {
768         memcpy(&dbase->cachedVersion, &dbase->version,
769                sizeof(dbase->cachedVersion));
770     }
771 }
772
773 /*!
774  * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
775  * \return an error code.
776  */
777 int
778 ubik_EndTrans(struct ubik_trans *transPtr)
779 {
780     afs_int32 code;
781     struct timeval tv;
782     afs_int32 realStart;
783     struct ubik_server *ts;
784     afs_int32 now;
785     int cachelocked = 0;
786     struct ubik_dbase *dbase;
787
788     if (transPtr->type == UBIK_WRITETRANS) {
789         code = ubik_Flush(transPtr);
790         if (code) {
791             ubik_AbortTrans(transPtr);
792             return (code);
793         }
794     }
795
796     dbase = transPtr->dbase;
797
798     if (transPtr->flags & TRCACHELOCKED) {
799         ReleaseReadLock(&dbase->cache_lock);
800         transPtr->flags &= ~TRCACHELOCKED;
801     }
802
803     if (transPtr->type != UBIK_READTRANS) {
804         /* must hold cache_lock before DBHOLD'ing */
805         ObtainWriteLock(&dbase->cache_lock);
806         cachelocked = 1;
807     }
808
809     DBHOLD(dbase);
810
811     /* give up if no longer current */
812     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
813         udisk_abort(transPtr);
814         udisk_end(transPtr);
815         DBRELE(dbase);
816         code = UNOQUORUM;
817         goto error;
818     }
819
820     if (transPtr->type == UBIK_READTRANS) {     /* reads are easy */
821         code = udisk_commit(transPtr);
822         if (code == 0)
823             goto success;       /* update cachedVersion correctly */
824         udisk_end(transPtr);
825         DBRELE(dbase);
826         goto error;
827     }
828
829     if (!ubeacon_AmSyncSite()) {        /* no longer sync site */
830         udisk_abort(transPtr);
831         udisk_end(transPtr);
832         DBRELE(dbase);
833         code = UNOTSYNC;
834         goto error;
835     }
836
837     /* now it is safe to do commit */
838     code = udisk_commit(transPtr);
839     if (code == 0) {
840         /* db data has been committed locally; update the local cache so
841          * readers can get at it */
842         WritebackApplicationCache(dbase);
843
844         ReleaseWriteLock(&dbase->cache_lock);
845
846         code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
847
848     } else {
849         memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
850         ReleaseWriteLock(&dbase->cache_lock);
851     }
852     cachelocked = 0;
853     if (code) {
854         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
855          * Note that we don't know if this transaction will eventually commit at this point.
856          * If it made it to a site that will be present in the next quorum, we win, otherwise
857          * we lose.  If we contact a majority of sites, then we won't be here: contacting
858          * a majority guarantees commit, since it guarantees that one dude will be a
859          * member of the next quorum. */
860         ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
861         udisk_end(transPtr);
862         DBRELE(dbase);
863         goto error;
864     }
865     /* before we can start sending unlock messages, we must wait until all servers
866      * that are possibly still functioning on the other side of a network partition
867      * have timed out.  Check the server structures, compute how long to wait, then
868      * start the unlocks */
869     realStart = FT_ApproxTime();
870     while (1) {
871         /* wait for all servers to time out */
872         code = 0;
873         now = FT_ApproxTime();
874         /* check if we're still sync site, the guy should either come up
875          * to us, or timeout.  Put safety check in anyway */
876         if (now - realStart > 10 * BIGTIME) {
877             ubik_stats.escapes++;
878             ubik_print("ubik escaping from commit wait\n");
879             break;
880         }
881         for (ts = ubik_servers; ts; ts = ts->next) {
882             UBIK_BEACON_LOCK;
883             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
884                 UBIK_BEACON_UNLOCK;
885
886                 /* this guy could have some damaged data, wait for him */
887                 code = 1;
888                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
889                 tv.tv_usec = 0;
890
891 #ifdef AFS_PTHREAD_ENV
892                 /* we could release the dbase outside of the loop, but we do
893                  * it here, in the loop, to avoid an unnecessary RELE/HOLD
894                  * if all sites are up */
895                 DBRELE(dbase);
896                 select(0, 0, 0, 0, &tv);
897                 DBHOLD(dbase);
898 #else
899                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
900 #endif
901
902                 break;
903             }
904             UBIK_BEACON_UNLOCK;
905         }
906         if (code == 0)
907             break;              /* no down ones still pseudo-active */
908     }
909
910     /* finally, unlock all the dudes.  We can return success independent of the number of servers
911      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
912      * The transaction is committed anyway, since we succeeded in contacting a quorum
913      * at the start (when invoking the DiskCommit function).
914      */
915     ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
916
917   success:
918     udisk_end(transPtr);
919     /* don't update cachedVersion here; it should have been updated way back
920      * in ubik_CheckCache, and earlier in this function for writes */
921     DBRELE(dbase);
922     if (cachelocked) {
923         ReleaseWriteLock(&dbase->cache_lock);
924     }
925     return 0;
926
927   error:
928     if (!cachelocked) {
929         ObtainWriteLock(&dbase->cache_lock);
930     }
931     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
932     ReleaseWriteLock(&dbase->cache_lock);
933     return code;
934 }
935
936 /*!
937  * \brief This routine reads length bytes into buffer from the current position in the database.
938  *
939  * The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  A short read returns zero for an error code.
940  *
941  * \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
942  */
943 int
944 ubik_Read(struct ubik_trans *transPtr, void *buffer,
945           afs_int32 length)
946 {
947     afs_int32 code;
948
949     /* reads are easy to do: handle locally */
950     DBHOLD(transPtr->dbase);
951     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
952         DBRELE(transPtr->dbase);
953         return UNOQUORUM;
954     }
955
956     code =
957         udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
958                    length);
959     if (code == 0) {
960         transPtr->seekPos += length;
961     }
962     DBRELE(transPtr->dbase);
963     return code;
964 }
965
966 /*!
967  * \brief This routine will flush the io data in the iovec structures.
968  *
969  * It first flushes to the local disk and then uses ContactQuorum to write it
970  * to the other servers.
971  */
972 int
973 ubik_Flush(struct ubik_trans *transPtr)
974 {
975     afs_int32 code, error = 0;
976
977     if (transPtr->type != UBIK_WRITETRANS)
978         return UBADTYPE;
979     if (!transPtr->iovec_info.iovec_wrt_len
980         || !transPtr->iovec_info.iovec_wrt_val)
981         return 0;
982
983     DBHOLD(transPtr->dbase);
984     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
985         ERROR_EXIT(UNOQUORUM);
986     if (!ubeacon_AmSyncSite())  /* only sync site can write */
987         ERROR_EXIT(UNOTSYNC);
988
989     /* Update the rest of the servers in the quorum */
990     code =
991         ContactQuorum_DISK_WriteV(transPtr, 0, &transPtr->iovec_info,
992                                   &transPtr->iovec_data);
993     if (code) {
994         udisk_abort(transPtr);
995         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
996         transPtr->iovec_info.iovec_wrt_len = 0;
997         transPtr->iovec_data.iovec_buf_len = 0;
998         ERROR_EXIT(code);
999     }
1000
1001     /* Wrote the buffers out, so start at scratch again */
1002     transPtr->iovec_info.iovec_wrt_len = 0;
1003     transPtr->iovec_data.iovec_buf_len = 0;
1004
1005   error_exit:
1006     DBRELE(transPtr->dbase);
1007     return error;
1008 }
1009
1010 int
1011 ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
1012            afs_int32 length)
1013 {
1014     struct ubik_iovec *iovec;
1015     afs_int32 code, error = 0;
1016     afs_int32 pos, len, size;
1017     char * buffer = (char *)vbuffer;
1018
1019     if (transPtr->type != UBIK_WRITETRANS)
1020         return UBADTYPE;
1021     if (!length)
1022         return 0;
1023
1024     if (length > IOVEC_MAXBUF) {
1025         for (pos = 0, len = length; len > 0; len -= size, pos += size) {
1026             size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
1027             code = ubik_Write(transPtr, buffer+pos, size);
1028             if (code)
1029                 return (code);
1030         }
1031         return 0;
1032     }
1033
1034     if (!transPtr->iovec_info.iovec_wrt_val) {
1035         transPtr->iovec_info.iovec_wrt_len = 0;
1036         transPtr->iovec_info.iovec_wrt_val =
1037             (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
1038                                         sizeof(struct ubik_iovec));
1039         transPtr->iovec_data.iovec_buf_len = 0;
1040         transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
1041         if (!transPtr->iovec_info.iovec_wrt_val
1042             || !transPtr->iovec_data.iovec_buf_val) {
1043             if (transPtr->iovec_info.iovec_wrt_val)
1044                 free(transPtr->iovec_info.iovec_wrt_val);
1045             transPtr->iovec_info.iovec_wrt_val = 0;
1046             if (transPtr->iovec_data.iovec_buf_val)
1047                 free(transPtr->iovec_data.iovec_buf_val);
1048             transPtr->iovec_data.iovec_buf_val = 0;
1049             return UNOMEM;
1050         }
1051     }
1052
1053     /* If this write won't fit in the structure, then flush it out and start anew */
1054     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
1055         || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
1056         code = ubik_Flush(transPtr);
1057         if (code)
1058             return (code);
1059     }
1060
1061     DBHOLD(transPtr->dbase);
1062     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1063         ERROR_EXIT(UNOQUORUM);
1064     if (!ubeacon_AmSyncSite())  /* only sync site can write */
1065         ERROR_EXIT(UNOTSYNC);
1066
1067     /* Write to the local disk */
1068     code =
1069         udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
1070                     length);
1071     if (code) {
1072         udisk_abort(transPtr);
1073         transPtr->iovec_info.iovec_wrt_len = 0;
1074         transPtr->iovec_data.iovec_buf_len = 0;
1075         DBRELE(transPtr->dbase);
1076         return (code);
1077     }
1078
1079     /* Collect writes for the other ubik servers (to be done in bulk) */
1080     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
1081     iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
1082     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
1083     iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
1084
1085     memcpy(&transPtr->iovec_data.
1086            iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
1087
1088     transPtr->iovec_info.iovec_wrt_len++;
1089     transPtr->iovec_data.iovec_buf_len += length;
1090     transPtr->seekPos += length;
1091
1092   error_exit:
1093     DBRELE(transPtr->dbase);
1094     return error;
1095 }
1096
1097 /*!
1098  * \brief This sets the file pointer associated with the current transaction
1099  * to the appropriate file and byte position.
1100  *
1101  * Unlike Unix files, a transaction is labelled by both a file number \p fileid
1102  * and a byte position relative to the specified file \p position.
1103  */
1104 int
1105 ubik_Seek(struct ubik_trans *transPtr, afs_int32 fileid,
1106           afs_int32 position)
1107 {
1108     afs_int32 code;
1109
1110     DBHOLD(transPtr->dbase);
1111     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
1112         code = UNOQUORUM;
1113     } else {
1114         transPtr->seekFile = fileid;
1115         transPtr->seekPos = position;
1116         code = 0;
1117     }
1118     DBRELE(transPtr->dbase);
1119     return code;
1120 }
1121
1122 /*!
1123  * \brief This call returns the file pointer associated with the specified
1124  * transaction in \p fileid and \p position.
1125  */
1126 int
1127 ubik_Tell(struct ubik_trans *transPtr, afs_int32 * fileid,
1128           afs_int32 * position)
1129 {
1130     DBHOLD(transPtr->dbase);
1131     *fileid = transPtr->seekFile;
1132     *position = transPtr->seekPos;
1133     DBRELE(transPtr->dbase);
1134     return 0;
1135 }
1136
1137 /*!
1138  * \brief This sets the file size for the currently-selected file to \p length
1139  * bytes, if length is less than the file's current size.
1140  */
1141 int
1142 ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
1143 {
1144     afs_int32 code, error = 0;
1145
1146     /* Will also catch if not UBIK_WRITETRANS */
1147     code = ubik_Flush(transPtr);
1148     if (code)
1149         return (code);
1150
1151     DBHOLD(transPtr->dbase);
1152     /* first, check that quorum is still good, and that dbase is up-to-date */
1153     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1154         ERROR_EXIT(UNOQUORUM);
1155     if (!ubeacon_AmSyncSite())
1156         ERROR_EXIT(UNOTSYNC);
1157
1158     /* now do the operation locally, and propagate it out */
1159     code = udisk_truncate(transPtr, transPtr->seekFile, length);
1160     if (!code) {
1161         code =
1162             ContactQuorum_DISK_Truncate(transPtr, 0, transPtr->seekFile,
1163                                         length);
1164     }
1165     if (code) {
1166         /* we must abort the operation */
1167         udisk_abort(transPtr);
1168         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
1169         ERROR_EXIT(code);
1170     }
1171
1172   error_exit:
1173     DBRELE(transPtr->dbase);
1174     return error;
1175 }
1176
1177 /*!
1178  * \brief set a lock; all locks are released on transaction end (commit/abort)
1179  */
1180 int
1181 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
1182              int atype)
1183 {
1184     afs_int32 code = 0, error = 0;
1185
1186     if (atype == LOCKWRITE) {
1187         if (atrans->type == UBIK_READTRANS)
1188             return UBADTYPE;
1189         code = ubik_Flush(atrans);
1190         if (code)
1191             return (code);
1192     }
1193
1194     DBHOLD(atrans->dbase);
1195     if (atype == LOCKREAD) {
1196         code = ulock_getLock(atrans, atype, 1);
1197         if (code)
1198             ERROR_EXIT(code);
1199     } else {
1200         /* first, check that quorum is still good, and that dbase is up-to-date */
1201         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
1202             ERROR_EXIT(UNOQUORUM);
1203         if (!ubeacon_AmSyncSite())
1204             ERROR_EXIT(UNOTSYNC);
1205
1206         /* now do the operation locally, and propagate it out */
1207         code = ulock_getLock(atrans, atype, 1);
1208         if (code == 0) {
1209             code = ContactQuorum_DISK_Lock(atrans, 0, 0, 1 /*unused */ ,
1210                                            1 /*unused */ , LOCKWRITE);
1211         }
1212         if (code) {
1213             /* we must abort the operation */
1214             udisk_abort(atrans);
1215             ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
1216             ERROR_EXIT(code);
1217         }
1218     }
1219
1220   error_exit:
1221     DBRELE(atrans->dbase);
1222     return error;
1223 }
1224
1225 /*!
1226  * \brief utility to wait for a version # to change
1227  */
1228 int
1229 ubik_WaitVersion(struct ubik_dbase *adatabase,
1230                  struct ubik_version *aversion)
1231 {
1232     DBHOLD(adatabase);
1233     while (1) {
1234         /* wait until version # changes, and then return */
1235         if (vcmp(*aversion, adatabase->version) != 0) {
1236             DBRELE(adatabase);
1237             return 0;
1238         }
1239 #ifdef AFS_PTHREAD_ENV
1240         CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
1241 #else
1242         DBRELE(adatabase);
1243         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
1244         DBHOLD(adatabase);
1245 #endif
1246     }
1247 }
1248
1249 /*!
1250  * \brief utility to get the version of the dbase a transaction is dealing with
1251  */
1252 int
1253 ubik_GetVersion(struct ubik_trans *atrans,
1254                 struct ubik_version *avers)
1255 {
1256     *avers = atrans->dbase->version;
1257     return 0;
1258 }
1259
1260 /*!
1261  * \brief Facility to simplify database caching.
1262  * \return zero if last trans was done on the local server and was successful.
1263  * \return -1 means bad (NULL) argument.
1264  *
1265  * If return value is non-zero and the caller is a server caching part of the
1266  * Ubik database, it should invalidate that cache.
1267  */
1268 static int
1269 ubik_CacheUpdate(struct ubik_trans *atrans)
1270 {
1271     if (!(atrans && atrans->dbase))
1272         return -1;
1273     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
1274 }
1275
1276 /**
1277  * check and possibly update cache of ubik db.
1278  *
1279  * If the version of the cached db data is out of date, this calls (*check) to
1280  * update the cache. If (*check) returns success, we update the version of the
1281  * cached db data.
1282  *
1283  * Checking the version of the cached db data is done under a read lock;
1284  * updating the cache (and thus calling (*check)) is done under a write lock
1285  * so is guaranteed not to interfere with another thread's (*check). On
1286  * successful return, a read lock on the cached db data is obtained, which
1287  * will be released by ubik_EndTrans or ubik_AbortTrans.
1288  *
1289  * @param[in] atrans ubik transaction
1290  * @param[in] check  function to call to check/update cache
1291  * @param[in] rock   rock to pass to *check
1292  *
1293  * @return operation status
1294  *   @retval 0       success
1295  *   @retval nonzero error; cachedVersion not updated
1296  *
1297  * @post On success, application cache is read-locked, and cache data is
1298  *       up-to-date
1299  */
1300 int
1301 ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
1302 {
1303     int ret = 0;
1304
1305     if (!(atrans && atrans->dbase))
1306         return -1;
1307
1308     ObtainReadLock(&atrans->dbase->cache_lock);
1309
1310     while (ubik_CacheUpdate(atrans) != 0) {
1311
1312         ReleaseReadLock(&atrans->dbase->cache_lock);
1313         ObtainSharedLock(&atrans->dbase->cache_lock);
1314
1315         if (ubik_CacheUpdate(atrans) != 0) {
1316
1317             BoostSharedLock(&atrans->dbase->cache_lock);
1318
1319             ret = (*cbf) (atrans, rock);
1320             if (ret == 0) {
1321                 memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
1322                        sizeof(atrans->dbase->cachedVersion));
1323             }
1324         }
1325
1326         /* It would be nice if we could convert from a shared lock to a read
1327          * lock... instead, just release the shared and acquire the read */
1328         ReleaseSharedLock(&atrans->dbase->cache_lock);
1329
1330         if (ret) {
1331             /* if we have an error, don't retry, and don't hold any locks */
1332             return ret;
1333         }
1334
1335         ObtainReadLock(&atrans->dbase->cache_lock);
1336     }
1337
1338     atrans->flags |= TRCACHELOCKED;
1339
1340     return 0;
1341 }
1342
1343 /*!
1344  * "Who said anything about panicking?" snapped Arthur.
1345  * "This is still just the culture shock. You wait till I've settled down
1346  * into the situation and found my bearings. \em Then I'll start panicking!"
1347  * --Authur Dent
1348  *
1349  * \returns There is no return from panic.
1350  */
1351 void
1352 panic(char *format, ...)
1353 {
1354     va_list ap;
1355
1356     va_start(ap, format);
1357     ubik_print("Ubik PANIC: ");
1358     ubik_vprint(format, ap);
1359     va_end(ap);
1360
1361     abort();
1362     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
1363     exit(1);                    /* never know, though  */
1364 }
1365
1366 /*!
1367  * This function takes an IP addresses as its parameter. It returns the
1368  * the primary IP address that is on the host passed in, or 0 if not found.
1369  */
1370 afs_uint32
1371 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
1372 {
1373     struct ubik_server *ts;
1374     int j;
1375
1376     UBIK_ADDR_LOCK;
1377     for (ts = ubik_servers; ts; ts = ts->next)
1378         for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
1379             if (ts->addr[j] == addr) {
1380                 UBIK_ADDR_UNLOCK;
1381                 return ts->addr[0];     /* net byte order */
1382             }
1383     UBIK_ADDR_UNLOCK;
1384     return 0;                   /* if not in server database, return error */
1385 }
1386
1387 int
1388 ubik_CheckAuth(struct rx_call *acall)
1389 {
1390     if (checkSecurityProc)
1391         return (*checkSecurityProc) (securityRock, acall);
1392     else if (ubik_CheckRXSecurityProc) {
1393         return (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
1394     } else
1395         return 0;
1396 }
1397
1398 void
1399 ubik_SetServerSecurityProcs(void (*buildproc) (void *,
1400                                                struct rx_securityClass ***,
1401                                                afs_int32 *),
1402                             int (*checkproc) (void *, struct rx_call *),
1403                             void *rock)
1404 {
1405     buildSecClassesProc = buildproc;
1406     checkSecurityProc = checkproc;
1407     securityRock = rock;
1408 }