ubik: add lock initialization function
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18 #include <time.h>
19
20 #ifdef AFS_NT40_ENV
21 #include <winsock2.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #include <sys/param.h>
26 #endif
27
28 #include <lock.h>
29 #include <rx/xdr.h>
30 #include <rx/rx.h>
31 #include <afs/cellconfig.h>
32
33 #define UBIK_INTERNALS
34 #include "ubik.h"
35 #include "ubik_int.h"
36
37 #include <lwp.h>   /* temporary hack by klm */
38
39 #define ERROR_EXIT(code) do { \
40     error = (code); \
41     goto error_exit; \
42 } while (0)
43
44 /*!
45  * \file
46  * This system is organized in a hierarchical set of related modules.  Modules
47  * at one level can only call modules at the same level or below.
48  *
49  * At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
50  * operating system primitives.
51  *
52  * At the next level (1) we have
53  *
54  * \li VOTER--The module responsible for casting votes when asked.  It is also
55  * responsible for determining whether this server should try to become
56  * a synchronization site.
57  * \li BEACONER--The module responsible for sending keep-alives out when a
58  * server is actually the sync site, or trying to become a sync site.
59  * \li DISK--The module responsible for representing atomic transactions
60  * on the local disk.  It maintains a new-value only log.
61  * \li LOCK--The module responsible for locking byte ranges in the database file.
62  *
63  * At the next level (2) we have
64  *
65  * \li RECOVERY--The module responsible for ensuring that all members of a quorum
66  * have the same up-to-date database after a new synchronization site is
67  * elected.  This module runs only on the synchronization site.
68  *
69  * At the next level (3) we have
70  *
71  * \li REMOTE--The module responsible for interpreting requests from the sync
72  * site and applying them to the database, after obtaining the appropriate
73  * locks.
74  *
75  * At the next level (4) we have
76  *
77  * \li UBIK--The module users call to perform operations on the database.
78  */
79
80
81 /* some globals */
82 afs_int32 ubik_quorum = 0;
83 struct ubik_dbase *ubik_dbase = 0;
84 struct ubik_stats ubik_stats;
85 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
86 afs_int32 ubik_epochTime = 0;
87 afs_int32 urecovery_state = 0;
88 int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
89 void *ubik_SRXSecurityRock;
90 int (*ubik_SyncWriterCacheProc) (void);
91 struct ubik_server *ubik_servers;
92 short ubik_callPortal;
93
94 static int BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
95                       struct ubik_trans **transPtr, int readAny);
96
97 struct rx_securityClass *ubik_sc[3];
98
99 #define CStampVersion       1   /* meaning set ts->version */
100
101 static_inline struct rx_connection *
102 Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
103 {
104     struct rx_connection *conn;
105
106     conn = as->disk_rxcid;
107
108 #ifdef AFS_PTHREAD_ENV
109     rx_GetConnection(conn);
110     DBRELE(atrans->dbase);
111 #endif /* AFS_PTHREAD_ENV */
112
113     return conn;
114 }
115
116 static_inline void
117 Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
118 {
119 #ifdef AFS_PTHREAD_ENV
120     DBHOLD(atrans->dbase);
121     rx_PutConnection(aconn);
122 #endif /* AFS_PTHREAD_ENV */
123 }
124
125
126 /*
127  * Iterate over all servers.  Callers pass in *ts which is used to track
128  * the current server.
129  * - Returns 1 if there are no more servers
130  * - Returns 0 with conn set to the connection for the current server if
131  *   it's up and current
132  */
133 static int
134 ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
135                          struct rx_connection **conn, afs_int32 *rcode,
136                          afs_int32 *okcalls, afs_int32 code)
137 {
138     if (!*ts) {
139         /* Initial call - start iterating over servers */
140         *ts = ubik_servers;
141         *conn = NULL;
142         *rcode = 0;
143         *okcalls = 0;
144     } else {
145         if (*conn) {
146             Quorum_EndIO(atrans, *conn);
147             *conn = NULL;
148             if (code) {         /* failure */
149                 *rcode = code;
150                 (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
151                 (*ts)->beaconSinceDown = 0;
152                 (*ts)->currentDB = 0;
153                 urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
154             } else {            /* success */
155                 if (!(*ts)->isClone)
156                     (*okcalls)++;       /* count up how many worked */
157                 if (aflags & CStampVersion) {
158                     (*ts)->version = atrans->dbase->version;
159                 }
160             }
161         }
162         *ts = (*ts)->next;
163     }
164     if (!(*ts))
165         return 1;
166     if (!(*ts)->up || !(*ts)->currentDB) {
167         (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
168         return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
169     }
170     *conn = Quorum_StartIO(atrans, *ts);
171     return 0;
172 }
173
174 static int
175 ContactQuorum_rcode(int okcalls, afs_int32 rcode)
176 {
177     /*
178      * return 0 if we successfully contacted a quorum, otherwise return error code.
179      * We don't have to contact ourselves (that was done locally)
180      */
181     if (okcalls + 1 >= ubik_quorum)
182         return 0;
183     else
184         return rcode;
185 }
186
187 /*!
188  * \brief Perform an operation at a quorum, handling error conditions.
189  * \return 0 if all worked and a quorum was contacted successfully
190  * \return otherwise mark failing server as down and return #UERROR
191  *
192  * \note If any server misses an update, we must wait #BIGTIME seconds before
193  * allowing the transaction to commit, to ensure that the missing and
194  * possibly still functioning server times out and stops handing out old
195  * data.  This is done in the commit code, where we wait for a server marked
196  * down to have stayed down for #BIGTIME seconds before we allow a transaction
197  * to commit.  A server that fails but comes back up won't give out old data
198  * because it is sent the sync count along with the beacon message that
199  * marks it as \b really up (\p beaconSinceDown).
200  */
201 afs_int32
202 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
203                           struct ubik_trans *atrans, int aflags)
204 {
205     struct ubik_server *ts = NULL;
206     afs_int32 code = 0, rcode, okcalls;
207     struct rx_connection *conn;
208     int done;
209
210     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
211     while (!done) {
212         if (conn)
213             code = (*proc)(conn, &atrans->tid);
214         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
215     }
216     return ContactQuorum_rcode(okcalls, rcode);
217 }
218
219
220 afs_int32
221 ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
222                         afs_int32 position, afs_int32 length, afs_int32 type)
223 {
224     struct ubik_server *ts = NULL;
225     afs_int32 code = 0, rcode, okcalls;
226     struct rx_connection *conn;
227     int done;
228
229     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
230     while (!done) {
231         if (conn)
232             code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
233         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
234     }
235     return ContactQuorum_rcode(okcalls, rcode);
236 }
237
238
239 afs_int32
240 ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
241                          afs_int32 file, afs_int32 position, bulkdata *data)
242 {
243     struct ubik_server *ts = NULL;
244     afs_int32 code = 0, rcode, okcalls;
245     struct rx_connection *conn;
246     int done;
247
248     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
249     while (!done) {
250         if (conn)
251             code = DISK_Write(conn, &atrans->tid, file, position, data);
252         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
253     }
254     return ContactQuorum_rcode(okcalls, rcode);
255 }
256
257
258 afs_int32
259 ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
260                             afs_int32 file, afs_int32 length)
261 {
262     struct ubik_server *ts = NULL;
263     afs_int32 code = 0, rcode, okcalls;
264     struct rx_connection *conn;
265     int done;
266
267     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
268     while (!done) {
269         if (conn)
270             code = DISK_Truncate(conn, &atrans->tid, file, length);
271         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
272     }
273     return ContactQuorum_rcode(okcalls, rcode);
274 }
275
276
277 afs_int32
278 ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
279                           iovec_wrt * io_vector, iovec_buf *io_buffer)
280 {
281     struct ubik_server *ts = NULL;
282     afs_int32 code = 0, rcode, okcalls;
283     struct rx_connection *conn;
284     int done;
285
286     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
287     while (!done) {
288         if (conn) {
289             code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
290             if ((code <= -450) && (code > -500)) {
291                 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
292                  * Un-bulk the entries and do individual DISK_Write calls
293                  * instead of DISK_WriteV.
294                  */
295                 struct ubik_iovec *iovec =
296                         (struct ubik_iovec *)io_vector->iovec_wrt_val;
297                 char *iobuf = (char *)io_buffer->iovec_buf_val;
298                 bulkdata tcbs;
299                 afs_int32 i, offset;
300
301                 Quorum_EndIO(atrans, conn);
302                 conn = Quorum_StartIO(atrans, ts);
303
304                 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
305                     /* Sanity check for going off end of buffer */
306                     if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
307                         code = UINTERNAL;
308                         break;
309                     }
310                     tcbs.bulkdata_len = iovec[i].length;
311                     tcbs.bulkdata_val = &iobuf[offset];
312                     code = DISK_Write(conn, &atrans->tid, iovec[i].file,
313                            iovec[i].position, &tcbs);
314                     if (code)
315                         break;
316                     offset += iovec[i].length;
317                 }
318             }
319         }
320         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
321     }
322     return ContactQuorum_rcode(okcalls, rcode);
323 }
324
325
326 afs_int32
327 ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
328                               ubik_version *OldVersion,
329                               ubik_version *NewVersion)
330 {
331     struct ubik_server *ts = NULL;
332     afs_int32 code = 0, rcode, okcalls;
333     struct rx_connection *conn;
334     int done;
335
336     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
337     while (!done) {
338         if (conn)
339             code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
340         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
341     }
342     return ContactQuorum_rcode(okcalls, rcode);
343 }
344
345
346 /*!
347  * \brief This routine initializes the ubik system for a set of servers.
348  * \return 0 for success, or an error code on failure.
349  * \param serverList set of servers specified; nServers gives the number of entries in this array.
350  * \param pathName provides an initial prefix used for naming storage files used by this system.
351  * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.
352  *
353  * \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
354  *
355  * \warning The host named by myHost should not also be listed in serverList.
356  *
357  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
358  */
359 int
360 ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
361                       struct afsconf_cell *info, char clones[],
362                       afs_uint32 serverList[], const char *pathName,
363                       struct ubik_dbase **dbase)
364 {
365     struct ubik_dbase *tdb;
366     afs_int32 code;
367 #ifdef AFS_PTHREAD_ENV
368     pthread_t rxServerThread;        /* pthread variables */
369     pthread_t ubeacon_InteractThread;
370     pthread_t urecovery_InteractThread;
371     pthread_attr_t rxServer_tattr;
372     pthread_attr_t ubeacon_Interact_tattr;
373     pthread_attr_t urecovery_Interact_tattr;
374 #else
375     PROCESS junk;
376     extern int rx_stackSize;
377 #endif
378
379     afs_int32 secIndex;
380     struct rx_securityClass *secClass;
381
382     struct rx_service *tservice;
383
384     initialize_U_error_table();
385
386     tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
387     tdb->pathName = (char *)malloc(strlen(pathName) + 1);
388     strcpy(tdb->pathName, pathName);
389     tdb->activeTrans = (struct ubik_trans *)0;
390     memset(&tdb->version, 0, sizeof(struct ubik_version));
391     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
392 #ifdef AFS_PTHREAD_ENV
393     MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
394 #else
395     Lock_Init(&tdb->versionLock);
396 #endif
397     Lock_Init(&tdb->cache_lock);
398     tdb->flags = 0;
399     tdb->read = uphys_read;
400     tdb->write = uphys_write;
401     tdb->truncate = uphys_truncate;
402     tdb->open = uphys_invalidate;       /* this function isn't used any more */
403     tdb->sync = uphys_sync;
404     tdb->stat = uphys_stat;
405     tdb->getlabel = uphys_getlabel;
406     tdb->setlabel = uphys_setlabel;
407     tdb->getnfiles = uphys_getnfiles;
408     tdb->readers = 0;
409     tdb->tidCounter = tdb->writeTidCounter = 0;
410     *dbase = tdb;
411     ubik_dbase = tdb;           /* for now, only one db per server; can fix later when we have names for the other dbases */
412
413 #ifdef AFS_PTHREAD_ENV
414     CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
415     CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
416 #endif /* AFS_PTHREAD_ENV */
417
418     /* initialize RX */
419
420     /* the following call is idempotent so when/if it got called earlier,
421      * by whatever called us, it doesn't really matter -- klm */
422     code = rx_Init(myPort);
423     if (code < 0)
424         return code;
425
426     udisk_Init(ubik_nBuffers);
427     ulock_Init();
428
429     ubik_callPortal = myPort;
430     /* try to get an additional security object */
431     ubik_sc[0] = rxnull_NewServerSecurityObject();
432     ubik_sc[1] = 0;
433     ubik_sc[2] = 0;
434     if (ubik_SRXSecurityProc) {
435         code =
436             (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
437                                      &secIndex);
438         if (code == 0) {
439             ubik_sc[secIndex] = secClass;
440         }
441     }
442     /* for backwards compat this should keep working as it does now
443        and not host bind */
444 #if 0
445     /* This really needs to be up above, where I have put it.  It works
446      * here when we're non-pthreaded, but the code above, when using
447      * pthreads may (and almost certainly does) end up calling on a
448      * pthread resource which gets initialized by rx_Init.  The end
449      * result is that an assert fails and the program dies. -- klm
450      */
451     code = rx_Init(myPort);
452     if (code < 0)
453         return code;
454 #endif
455
456     tservice =
457         rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
458                       VOTE_ExecuteRequest);
459     if (tservice == (struct rx_service *)0) {
460         ubik_dprint("Could not create VOTE rx service!\n");
461         return -1;
462     }
463     rx_SetMinProcs(tservice, 2);
464     rx_SetMaxProcs(tservice, 3);
465
466     tservice =
467         rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
468                       DISK_ExecuteRequest);
469     if (tservice == (struct rx_service *)0) {
470         ubik_dprint("Could not create DISK rx service!\n");
471         return -1;
472     }
473     rx_SetMinProcs(tservice, 2);
474     rx_SetMaxProcs(tservice, 3);
475
476     /* start an rx_ServerProc to handle incoming RPC's in particular the
477      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
478      * the "steplock" problem in ubik initialization. Defect 11037.
479      */
480 #ifdef AFS_PTHREAD_ENV
481 /* do assert stuff */
482     osi_Assert(pthread_attr_init(&rxServer_tattr) == 0);
483     osi_Assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
484 /*    osi_Assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
485
486     osi_Assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
487 #else
488     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
489               NULL, "rx_ServerProc", &junk);
490 #endif
491
492     /* do basic initialization */
493     code = uvote_Init();
494     if (code)
495         return code;
496     code = urecovery_Initialize(tdb);
497     if (code)
498         return code;
499     if (info)
500         code = ubeacon_InitServerListByInfo(myHost, info, clones);
501     else
502         code = ubeacon_InitServerList(myHost, serverList);
503     if (code)
504         return code;
505
506     /* now start up async processes */
507 #ifdef AFS_PTHREAD_ENV
508 /* do assert stuff */
509     osi_Assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
510     osi_Assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
511 /*    osi_Assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
512     /*  need another attr set here for priority???  - klm */
513
514     osi_Assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
515            (void *)ubeacon_Interact, NULL) == 0);
516 #else
517     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
518                              LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
519                              &junk);
520     if (code)
521         return code;
522 #endif
523
524 #ifdef AFS_PTHREAD_ENV
525 /* do assert stuff */
526     osi_Assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
527     osi_Assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
528 /*    osi_Assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
529     /*  need another attr set here for priority???  - klm */
530
531     osi_Assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
532            (void *)urecovery_Interact, NULL) == 0);
533
534     return 0;  /* is this correct?  - klm */
535 #else
536     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
537                              LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
538                              &junk);
539     return code;
540 #endif
541
542 }
543
544 /*!
545  * \see ubik_ServerInitCommon()
546  */
547 int
548 ubik_ServerInitByInfo(afs_uint32 myHost, short myPort,
549                       struct afsconf_cell *info, char clones[],
550                       const char *pathName, struct ubik_dbase **dbase)
551 {
552     afs_int32 code;
553
554     code =
555         ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
556                               dbase);
557     return code;
558 }
559
560 /*!
561  * \see ubik_ServerInitCommon()
562  */
563 int
564 ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
565                 const char *pathName, struct ubik_dbase **dbase)
566 {
567     afs_int32 code;
568
569     code =
570         ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
571                               serverList, pathName, dbase);
572     return code;
573 }
574
575 /*!
576  * \brief This routine begins a read or write transaction on the transaction
577  * identified by transPtr, in the dbase named by dbase.
578  *
579  * An open mode of ubik_READTRANS identifies this as a read transaction,
580  * while a mode of ubik_WRITETRANS identifies this as a write transaction.
581  * transPtr is set to the returned transaction control block.
582  * The readAny flag is set to 0 or 1 or 2 by the wrapper functions
583  * ubik_BeginTrans() or ubik_BeginTransReadAny() or
584  * ubik_BeginTransReadAnyWrite() below.
585  *
586  * \note We can only begin transaction when we have an up-to-date database.
587  */
588 static int
589 BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
590            struct ubik_trans **transPtr, int readAny)
591 {
592     struct ubik_trans *jt;
593     struct ubik_trans *tt;
594     afs_int32 code;
595
596     if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
597         /* it's not safe to use ubik_BeginTransReadAnyWrite without a
598          * cache-syncing function; fall back to ubik_BeginTransReadAny,
599          * which is safe but slower */
600         ubik_print("ubik_BeginTransReadAnyWrite called, but "
601                    "ubik_SyncWriterCacheProc not set; pretending "
602                    "ubik_BeginTransReadAny was called instead\n");
603         readAny = 1;
604     }
605
606     if ((transMode != UBIK_READTRANS) && readAny)
607         return UBADTYPE;
608     DBHOLD(dbase);
609     if (urecovery_AllBetter(dbase, readAny) == 0) {
610         DBRELE(dbase);
611         return UNOQUORUM;
612     }
613     /* otherwise we have a quorum, use it */
614
615     /* make sure that at most one write transaction occurs at any one time.  This
616      * has nothing to do with transaction locking; that's enforced by the lock package.  However,
617      * we can't even handle two non-conflicting writes, since our log and recovery modules
618      * don't know how to restore one without possibly picking up some data from the other. */
619     if (transMode == UBIK_WRITETRANS) {
620         /* if we're writing already, wait */
621         while (dbase->flags & DBWRITING) {
622 #ifdef AFS_PTHREAD_ENV
623             CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
624 #else
625             DBRELE(dbase);
626             LWP_WaitProcess(&dbase->flags);
627             DBHOLD(dbase);
628 #endif
629         }
630
631         if (!ubeacon_AmSyncSite()) {
632             DBRELE(dbase);
633             return UNOTSYNC;
634         }
635     }
636
637     /* create the transaction */
638     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
639     tt = jt;                    /* move to a register */
640     if (code || tt == (struct ubik_trans *)NULL) {
641         DBRELE(dbase);
642         return code;
643     }
644     if (readAny) {
645         tt->flags |= TRREADANY;
646         if (readAny > 1) {
647             tt->flags |= TRREADWRITE;
648         }
649     }
650     /* label trans and dbase with new tid */
651     tt->tid.epoch = ubik_epochTime;
652     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
653     tt->tid.counter = (dbase->tidCounter += 2);
654
655     if (transMode == UBIK_WRITETRANS) {
656         /* for a write trans, we have to keep track of the write tid counter too */
657         dbase->writeTidCounter = tt->tid.counter;
658
659         /* next try to start transaction on appropriate number of machines */
660         code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
661         if (code) {
662             /* we must abort the operation */
663             udisk_abort(tt);
664             ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
665             udisk_end(tt);
666             DBRELE(dbase);
667             return code;
668         }
669     }
670
671     *transPtr = tt;
672     DBRELE(dbase);
673     return 0;
674 }
675
676 /*!
677  * \see BeginTrans()
678  */
679 int
680 ubik_BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
681                 struct ubik_trans **transPtr)
682 {
683     return BeginTrans(dbase, transMode, transPtr, 0);
684 }
685
686 /*!
687  * \see BeginTrans()
688  */
689 int
690 ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
691                        struct ubik_trans **transPtr)
692 {
693     return BeginTrans(dbase, transMode, transPtr, 1);
694 }
695
696 /*!
697  * \see BeginTrans()
698  */
699 int
700 ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
701                             struct ubik_trans **transPtr)
702 {
703     return BeginTrans(dbase, transMode, transPtr, 2);
704 }
705
706 /*!
707  * \brief This routine ends a read or write transaction by aborting it.
708  */
709 int
710 ubik_AbortTrans(struct ubik_trans *transPtr)
711 {
712     afs_int32 code;
713     afs_int32 code2;
714     struct ubik_dbase *dbase;
715
716     dbase = transPtr->dbase;
717
718     if (transPtr->flags & TRCACHELOCKED) {
719         ReleaseReadLock(&dbase->cache_lock);
720         transPtr->flags &= ~TRCACHELOCKED;
721     }
722
723     ObtainWriteLock(&dbase->cache_lock);
724
725     DBHOLD(dbase);
726     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
727
728     ReleaseWriteLock(&dbase->cache_lock);
729
730     /* see if we're still up-to-date */
731     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
732         udisk_abort(transPtr);
733         udisk_end(transPtr);
734         DBRELE(dbase);
735         return UNOQUORUM;
736     }
737
738     if (transPtr->type == UBIK_READTRANS) {
739         code = udisk_abort(transPtr);
740         udisk_end(transPtr);
741         DBRELE(dbase);
742         return code;
743     }
744
745     /* below here, we know we're doing a write transaction */
746     if (!ubeacon_AmSyncSite()) {
747         udisk_abort(transPtr);
748         udisk_end(transPtr);
749         DBRELE(dbase);
750         return UNOTSYNC;
751     }
752
753     /* now it is safe to try remote abort */
754     code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
755     code2 = udisk_abort(transPtr);
756     udisk_end(transPtr);
757     DBRELE(dbase);
758     return (code ? code : code2);
759 }
760
761 static void
762 WritebackApplicationCache(struct ubik_dbase *dbase)
763 {
764     int code = 0;
765     if (ubik_SyncWriterCacheProc) {
766         code = ubik_SyncWriterCacheProc();
767     }
768     if (code) {
769         /* we failed to sync the local cache, so just invalidate the cache;
770          * we'll try to read the cache in again on the next read */
771         memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
772     } else {
773         memcpy(&dbase->cachedVersion, &dbase->version,
774                sizeof(dbase->cachedVersion));
775     }
776 }
777
778 /*!
779  * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
780  * \return an error code.
781  */
782 int
783 ubik_EndTrans(struct ubik_trans *transPtr)
784 {
785     afs_int32 code;
786     struct timeval tv;
787     afs_int32 realStart;
788     struct ubik_server *ts;
789     afs_int32 now;
790     int cachelocked = 0;
791     struct ubik_dbase *dbase;
792
793     if (transPtr->type == UBIK_WRITETRANS) {
794         code = ubik_Flush(transPtr);
795         if (code) {
796             ubik_AbortTrans(transPtr);
797             return (code);
798         }
799     }
800
801     dbase = transPtr->dbase;
802
803     if (transPtr->flags & TRCACHELOCKED) {
804         ReleaseReadLock(&dbase->cache_lock);
805         transPtr->flags &= ~TRCACHELOCKED;
806     }
807
808     if (transPtr->type != UBIK_READTRANS) {
809         /* must hold cache_lock before DBHOLD'ing */
810         ObtainWriteLock(&dbase->cache_lock);
811         cachelocked = 1;
812     }
813
814     DBHOLD(dbase);
815
816     /* give up if no longer current */
817     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
818         udisk_abort(transPtr);
819         udisk_end(transPtr);
820         DBRELE(dbase);
821         code = UNOQUORUM;
822         goto error;
823     }
824
825     if (transPtr->type == UBIK_READTRANS) {     /* reads are easy */
826         code = udisk_commit(transPtr);
827         if (code == 0)
828             goto success;       /* update cachedVersion correctly */
829         udisk_end(transPtr);
830         DBRELE(dbase);
831         goto error;
832     }
833
834     if (!ubeacon_AmSyncSite()) {        /* no longer sync site */
835         udisk_abort(transPtr);
836         udisk_end(transPtr);
837         DBRELE(dbase);
838         code = UNOTSYNC;
839         goto error;
840     }
841
842     /* now it is safe to do commit */
843     code = udisk_commit(transPtr);
844     if (code == 0) {
845         /* db data has been committed locally; update the local cache so
846          * readers can get at it */
847         WritebackApplicationCache(dbase);
848
849         ReleaseWriteLock(&dbase->cache_lock);
850
851         code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
852
853     } else {
854         memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
855         ReleaseWriteLock(&dbase->cache_lock);
856     }
857     cachelocked = 0;
858     if (code) {
859         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
860          * Note that we don't know if this transaction will eventually commit at this point.
861          * If it made it to a site that will be present in the next quorum, we win, otherwise
862          * we lose.  If we contact a majority of sites, then we won't be here: contacting
863          * a majority guarantees commit, since it guarantees that one dude will be a
864          * member of the next quorum. */
865         ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
866         udisk_end(transPtr);
867         DBRELE(dbase);
868         goto error;
869     }
870     /* before we can start sending unlock messages, we must wait until all servers
871      * that are possibly still functioning on the other side of a network partition
872      * have timed out.  Check the server structures, compute how long to wait, then
873      * start the unlocks */
874     realStart = FT_ApproxTime();
875     while (1) {
876         /* wait for all servers to time out */
877         code = 0;
878         now = FT_ApproxTime();
879         /* check if we're still sync site, the guy should either come up
880          * to us, or timeout.  Put safety check in anyway */
881         if (now - realStart > 10 * BIGTIME) {
882             ubik_stats.escapes++;
883             ubik_print("ubik escaping from commit wait\n");
884             break;
885         }
886         for (ts = ubik_servers; ts; ts = ts->next) {
887             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
888
889                 /* this guy could have some damaged data, wait for him */
890                 code = 1;
891                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
892                 tv.tv_usec = 0;
893
894 #ifdef AFS_PTHREAD_ENV
895                 /* we could release the dbase outside of the loop, but we do
896                  * it here, in the loop, to avoid an unnecessary RELE/HOLD
897                  * if all sites are up */
898                 DBRELE(dbase);
899                 select(0, 0, 0, 0, &tv);
900                 DBHOLD(dbase);
901 #else
902                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
903 #endif
904
905                 break;
906             }
907         }
908         if (code == 0)
909             break;              /* no down ones still pseudo-active */
910     }
911
912     /* finally, unlock all the dudes.  We can return success independent of the number of servers
913      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
914      * The transaction is committed anyway, since we succeeded in contacting a quorum
915      * at the start (when invoking the DiskCommit function).
916      */
917     ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
918
919   success:
920     udisk_end(transPtr);
921     /* don't update cachedVersion here; it should have been updated way back
922      * in ubik_CheckCache, and earlier in this function for writes */
923     DBRELE(dbase);
924     if (cachelocked) {
925         ReleaseWriteLock(&dbase->cache_lock);
926     }
927     return 0;
928
929   error:
930     if (!cachelocked) {
931         ObtainWriteLock(&dbase->cache_lock);
932     }
933     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
934     ReleaseWriteLock(&dbase->cache_lock);
935     return code;
936 }
937
938 /*!
939  * \brief This routine reads length bytes into buffer from the current position in the database.
940  *
941  * The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  A short read returns zero for an error code.
942  *
943  * \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
944  */
945 int
946 ubik_Read(struct ubik_trans *transPtr, void *buffer,
947           afs_int32 length)
948 {
949     afs_int32 code;
950
951     /* reads are easy to do: handle locally */
952     DBHOLD(transPtr->dbase);
953     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
954         DBRELE(transPtr->dbase);
955         return UNOQUORUM;
956     }
957
958     code =
959         udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
960                    length);
961     if (code == 0) {
962         transPtr->seekPos += length;
963     }
964     DBRELE(transPtr->dbase);
965     return code;
966 }
967
968 /*!
969  * \brief This routine will flush the io data in the iovec structures.
970  *
971  * It first flushes to the local disk and then uses ContactQuorum to write it
972  * to the other servers.
973  */
974 int
975 ubik_Flush(struct ubik_trans *transPtr)
976 {
977     afs_int32 code, error = 0;
978
979     if (transPtr->type != UBIK_WRITETRANS)
980         return UBADTYPE;
981     if (!transPtr->iovec_info.iovec_wrt_len
982         || !transPtr->iovec_info.iovec_wrt_val)
983         return 0;
984
985     DBHOLD(transPtr->dbase);
986     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
987         ERROR_EXIT(UNOQUORUM);
988     if (!ubeacon_AmSyncSite())  /* only sync site can write */
989         ERROR_EXIT(UNOTSYNC);
990
991     /* Update the rest of the servers in the quorum */
992     code =
993         ContactQuorum_DISK_WriteV(transPtr, 0, &transPtr->iovec_info,
994                                   &transPtr->iovec_data);
995     if (code) {
996         udisk_abort(transPtr);
997         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
998         transPtr->iovec_info.iovec_wrt_len = 0;
999         transPtr->iovec_data.iovec_buf_len = 0;
1000         ERROR_EXIT(code);
1001     }
1002
1003     /* Wrote the buffers out, so start at scratch again */
1004     transPtr->iovec_info.iovec_wrt_len = 0;
1005     transPtr->iovec_data.iovec_buf_len = 0;
1006
1007   error_exit:
1008     DBRELE(transPtr->dbase);
1009     return error;
1010 }
1011
1012 int
1013 ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
1014            afs_int32 length)
1015 {
1016     struct ubik_iovec *iovec;
1017     afs_int32 code, error = 0;
1018     afs_int32 pos, len, size;
1019     char * buffer = (char *)vbuffer;
1020
1021     if (transPtr->type != UBIK_WRITETRANS)
1022         return UBADTYPE;
1023     if (!length)
1024         return 0;
1025
1026     if (length > IOVEC_MAXBUF) {
1027         for (pos = 0, len = length; len > 0; len -= size, pos += size) {
1028             size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
1029             code = ubik_Write(transPtr, buffer+pos, size);
1030             if (code)
1031                 return (code);
1032         }
1033         return 0;
1034     }
1035
1036     if (!transPtr->iovec_info.iovec_wrt_val) {
1037         transPtr->iovec_info.iovec_wrt_len = 0;
1038         transPtr->iovec_info.iovec_wrt_val =
1039             (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
1040                                         sizeof(struct ubik_iovec));
1041         transPtr->iovec_data.iovec_buf_len = 0;
1042         transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
1043         if (!transPtr->iovec_info.iovec_wrt_val
1044             || !transPtr->iovec_data.iovec_buf_val) {
1045             if (transPtr->iovec_info.iovec_wrt_val)
1046                 free(transPtr->iovec_info.iovec_wrt_val);
1047             transPtr->iovec_info.iovec_wrt_val = 0;
1048             if (transPtr->iovec_data.iovec_buf_val)
1049                 free(transPtr->iovec_data.iovec_buf_val);
1050             transPtr->iovec_data.iovec_buf_val = 0;
1051             return UNOMEM;
1052         }
1053     }
1054
1055     /* If this write won't fit in the structure, then flush it out and start anew */
1056     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
1057         || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
1058         code = ubik_Flush(transPtr);
1059         if (code)
1060             return (code);
1061     }
1062
1063     DBHOLD(transPtr->dbase);
1064     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1065         ERROR_EXIT(UNOQUORUM);
1066     if (!ubeacon_AmSyncSite())  /* only sync site can write */
1067         ERROR_EXIT(UNOTSYNC);
1068
1069     /* Write to the local disk */
1070     code =
1071         udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
1072                     length);
1073     if (code) {
1074         udisk_abort(transPtr);
1075         transPtr->iovec_info.iovec_wrt_len = 0;
1076         transPtr->iovec_data.iovec_buf_len = 0;
1077         DBRELE(transPtr->dbase);
1078         return (code);
1079     }
1080
1081     /* Collect writes for the other ubik servers (to be done in bulk) */
1082     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
1083     iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
1084     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
1085     iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
1086
1087     memcpy(&transPtr->iovec_data.
1088            iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
1089
1090     transPtr->iovec_info.iovec_wrt_len++;
1091     transPtr->iovec_data.iovec_buf_len += length;
1092     transPtr->seekPos += length;
1093
1094   error_exit:
1095     DBRELE(transPtr->dbase);
1096     return error;
1097 }
1098
1099 /*!
1100  * \brief This sets the file pointer associated with the current transaction
1101  * to the appropriate file and byte position.
1102  *
1103  * Unlike Unix files, a transaction is labelled by both a file number \p fileid
1104  * and a byte position relative to the specified file \p position.
1105  */
1106 int
1107 ubik_Seek(struct ubik_trans *transPtr, afs_int32 fileid,
1108           afs_int32 position)
1109 {
1110     afs_int32 code;
1111
1112     DBHOLD(transPtr->dbase);
1113     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
1114         code = UNOQUORUM;
1115     } else {
1116         transPtr->seekFile = fileid;
1117         transPtr->seekPos = position;
1118         code = 0;
1119     }
1120     DBRELE(transPtr->dbase);
1121     return code;
1122 }
1123
1124 /*!
1125  * \brief This call returns the file pointer associated with the specified
1126  * transaction in \p fileid and \p position.
1127  */
1128 int
1129 ubik_Tell(struct ubik_trans *transPtr, afs_int32 * fileid,
1130           afs_int32 * position)
1131 {
1132     DBHOLD(transPtr->dbase);
1133     *fileid = transPtr->seekFile;
1134     *position = transPtr->seekPos;
1135     DBRELE(transPtr->dbase);
1136     return 0;
1137 }
1138
1139 /*!
1140  * \brief This sets the file size for the currently-selected file to \p length
1141  * bytes, if length is less than the file's current size.
1142  */
1143 int
1144 ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
1145 {
1146     afs_int32 code, error = 0;
1147
1148     /* Will also catch if not UBIK_WRITETRANS */
1149     code = ubik_Flush(transPtr);
1150     if (code)
1151         return (code);
1152
1153     DBHOLD(transPtr->dbase);
1154     /* first, check that quorum is still good, and that dbase is up-to-date */
1155     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1156         ERROR_EXIT(UNOQUORUM);
1157     if (!ubeacon_AmSyncSite())
1158         ERROR_EXIT(UNOTSYNC);
1159
1160     /* now do the operation locally, and propagate it out */
1161     code = udisk_truncate(transPtr, transPtr->seekFile, length);
1162     if (!code) {
1163         code =
1164             ContactQuorum_DISK_Truncate(transPtr, 0, transPtr->seekFile,
1165                                         length);
1166     }
1167     if (code) {
1168         /* we must abort the operation */
1169         udisk_abort(transPtr);
1170         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
1171         ERROR_EXIT(code);
1172     }
1173
1174   error_exit:
1175     DBRELE(transPtr->dbase);
1176     return error;
1177 }
1178
1179 /*!
1180  * \brief set a lock; all locks are released on transaction end (commit/abort)
1181  */
1182 int
1183 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
1184              int atype)
1185 {
1186     afs_int32 code = 0, error = 0;
1187
1188     if (atype == LOCKWRITE) {
1189         if (atrans->type == UBIK_READTRANS)
1190             return UBADTYPE;
1191         code = ubik_Flush(atrans);
1192         if (code)
1193             return (code);
1194     }
1195
1196     DBHOLD(atrans->dbase);
1197     if (atype == LOCKREAD) {
1198         code = ulock_getLock(atrans, atype, 1);
1199         if (code)
1200             ERROR_EXIT(code);
1201     } else {
1202         /* first, check that quorum is still good, and that dbase is up-to-date */
1203         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
1204             ERROR_EXIT(UNOQUORUM);
1205         if (!ubeacon_AmSyncSite())
1206             ERROR_EXIT(UNOTSYNC);
1207
1208         /* now do the operation locally, and propagate it out */
1209         code = ulock_getLock(atrans, atype, 1);
1210         if (code == 0) {
1211             code = ContactQuorum_DISK_Lock(atrans, 0, 0, 1 /*unused */ ,
1212                                            1 /*unused */ , LOCKWRITE);
1213         }
1214         if (code) {
1215             /* we must abort the operation */
1216             udisk_abort(atrans);
1217             ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
1218             ERROR_EXIT(code);
1219         }
1220     }
1221
1222   error_exit:
1223     DBRELE(atrans->dbase);
1224     return error;
1225 }
1226
1227 /*!
1228  * \brief utility to wait for a version # to change
1229  */
1230 int
1231 ubik_WaitVersion(struct ubik_dbase *adatabase,
1232                  struct ubik_version *aversion)
1233 {
1234     DBHOLD(adatabase);
1235     while (1) {
1236         /* wait until version # changes, and then return */
1237         if (vcmp(*aversion, adatabase->version) != 0) {
1238             DBRELE(adatabase);
1239             return 0;
1240         }
1241 #ifdef AFS_PTHREAD_ENV
1242         CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
1243 #else
1244         DBRELE(adatabase);
1245         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
1246         DBHOLD(adatabase);
1247 #endif
1248     }
1249 }
1250
1251 /*!
1252  * \brief utility to get the version of the dbase a transaction is dealing with
1253  */
1254 int
1255 ubik_GetVersion(struct ubik_trans *atrans,
1256                 struct ubik_version *avers)
1257 {
1258     *avers = atrans->dbase->version;
1259     return 0;
1260 }
1261
1262 /*!
1263  * \brief Facility to simplify database caching.
1264  * \return zero if last trans was done on the local server and was successful.
1265  * \return -1 means bad (NULL) argument.
1266  *
1267  * If return value is non-zero and the caller is a server caching part of the
1268  * Ubik database, it should invalidate that cache.
1269  */
1270 static int
1271 ubik_CacheUpdate(struct ubik_trans *atrans)
1272 {
1273     if (!(atrans && atrans->dbase))
1274         return -1;
1275     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
1276 }
1277
1278 /**
1279  * check and possibly update cache of ubik db.
1280  *
1281  * If the version of the cached db data is out of date, this calls (*check) to
1282  * update the cache. If (*check) returns success, we update the version of the
1283  * cached db data.
1284  *
1285  * Checking the version of the cached db data is done under a read lock;
1286  * updating the cache (and thus calling (*check)) is done under a write lock
1287  * so is guaranteed not to interfere with another thread's (*check). On
1288  * successful return, a read lock on the cached db data is obtained, which
1289  * will be released by ubik_EndTrans or ubik_AbortTrans.
1290  *
1291  * @param[in] atrans ubik transaction
1292  * @param[in] check  function to call to check/update cache
1293  * @param[in] rock   rock to pass to *check
1294  *
1295  * @return operation status
1296  *   @retval 0       success
1297  *   @retval nonzero error; cachedVersion not updated
1298  *
1299  * @post On success, application cache is read-locked, and cache data is
1300  *       up-to-date
1301  */
1302 int
1303 ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
1304 {
1305     int ret = 0;
1306
1307     if (!(atrans && atrans->dbase))
1308         return -1;
1309
1310     ObtainReadLock(&atrans->dbase->cache_lock);
1311
1312     while (ubik_CacheUpdate(atrans) != 0) {
1313
1314         ReleaseReadLock(&atrans->dbase->cache_lock);
1315         ObtainSharedLock(&atrans->dbase->cache_lock);
1316
1317         if (ubik_CacheUpdate(atrans) != 0) {
1318
1319             BoostSharedLock(&atrans->dbase->cache_lock);
1320
1321             ret = (*cbf) (atrans, rock);
1322             if (ret == 0) {
1323                 memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
1324                        sizeof(atrans->dbase->cachedVersion));
1325             }
1326         }
1327
1328         /* It would be nice if we could convert from a shared lock to a read
1329          * lock... instead, just release the shared and acquire the read */
1330         ReleaseSharedLock(&atrans->dbase->cache_lock);
1331
1332         if (ret) {
1333             /* if we have an error, don't retry, and don't hold any locks */
1334             return ret;
1335         }
1336
1337         ObtainReadLock(&atrans->dbase->cache_lock);
1338     }
1339
1340     atrans->flags |= TRCACHELOCKED;
1341
1342     return 0;
1343 }
1344
1345 /*!
1346  * "Who said anything about panicking?" snapped Arthur.
1347  * "This is still just the culture shock. You wait till I've settled down
1348  * into the situation and found my bearings. \em Then I'll start panicking!"
1349  * --Authur Dent
1350  *
1351  * \returns There is no return from panic.
1352  */
1353 void
1354 panic(char *format, ...)
1355 {
1356     va_list ap;
1357
1358     va_start(ap, format);
1359     ubik_print("Ubik PANIC: ");
1360     ubik_vprint(format, ap);
1361     va_end(ap);
1362
1363     abort();
1364     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
1365     exit(1);                    /* never know, though  */
1366 }
1367
1368 /*!
1369  * This function takes an IP addresses as its parameter. It returns the
1370  * the primary IP address that is on the host passed in, or 0 if not found.
1371  */
1372 afs_uint32
1373 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
1374 {
1375     struct ubik_server *ts;
1376     int j;
1377
1378     for (ts = ubik_servers; ts; ts = ts->next)
1379         for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
1380             if (ts->addr[j] == addr)
1381                 return ts->addr[0];     /* net byte order */
1382     return 0;                   /* if not in server database, return error */
1383 }