855e2f282b8aa30f6f94435aae14e4f0607bc289
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <lock.h>
16 #include <rx/xdr.h>
17 #include <rx/rx.h>
18 #include <afs/cellconfig.h>
19
20 #define UBIK_INTERNALS
21 #include "ubik.h"
22 #include "ubik_int.h"
23
24 #include <lwp.h>   /* temporary hack by klm */
25
26 #define ERROR_EXIT(code) do { \
27     error = (code); \
28     goto error_exit; \
29 } while (0)
30
31 /*!
32  * \file
33  * This system is organized in a hierarchical set of related modules.  Modules
34  * at one level can only call modules at the same level or below.
35  *
36  * At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
37  * operating system primitives.
38  *
39  * At the next level (1) we have
40  *
41  * \li VOTER--The module responsible for casting votes when asked.  It is also
42  * responsible for determining whether this server should try to become
43  * a synchronization site.
44  * \li BEACONER--The module responsible for sending keep-alives out when a
45  * server is actually the sync site, or trying to become a sync site.
46  * \li DISK--The module responsible for representing atomic transactions
47  * on the local disk.  It maintains a new-value only log.
48  * \li LOCK--The module responsible for locking byte ranges in the database file.
49  *
50  * At the next level (2) we have
51  *
52  * \li RECOVERY--The module responsible for ensuring that all members of a quorum
53  * have the same up-to-date database after a new synchronization site is
54  * elected.  This module runs only on the synchronization site.
55  *
56  * At the next level (3) we have
57  *
58  * \li REMOTE--The module responsible for interpreting requests from the sync
59  * site and applying them to the database, after obtaining the appropriate
60  * locks.
61  *
62  * At the next level (4) we have
63  *
64  * \li UBIK--The module users call to perform operations on the database.
65  */
66
67
68 /* some globals */
69 afs_int32 ubik_quorum = 0;
70 struct ubik_dbase *ubik_dbase = 0;
71 struct ubik_stats ubik_stats;
72 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
73 afs_int32 ubik_epochTime = 0;
74 afs_int32 urecovery_state = 0;
75 int (*ubik_SyncWriterCacheProc) (void);
76 struct ubik_server *ubik_servers;
77 short ubik_callPortal;
78
79 /* These global variables were used to control the server security layers.
80  * They are retained for backwards compatibility with legacy callers.
81  *
82  * The ubik_SetServerSecurityProcs() interface should be used instead.
83  */
84
85 int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
86 void *ubik_SRXSecurityRock;
87 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
88 void *ubik_CheckRXSecurityRock;
89
90
91
92 static int BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
93                       struct ubik_trans **transPtr, int readAny);
94
95 static struct rx_securityClass **ubik_sc = NULL;
96 static void (*buildSecClassesProc)(void *, struct rx_securityClass ***,
97                                    afs_int32 *) = NULL;
98 static int (*checkSecurityProc)(void *, struct rx_call *) = NULL;
99 static void *securityRock = NULL;
100
101 #define CStampVersion       1   /* meaning set ts->version */
102
103 static_inline struct rx_connection *
104 Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
105 {
106     struct rx_connection *conn;
107
108     conn = as->disk_rxcid;
109
110 #ifdef AFS_PTHREAD_ENV
111     rx_GetConnection(conn);
112     DBRELE(atrans->dbase);
113 #endif /* AFS_PTHREAD_ENV */
114
115     return conn;
116 }
117
118 static_inline void
119 Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
120 {
121 #ifdef AFS_PTHREAD_ENV
122     DBHOLD(atrans->dbase);
123     rx_PutConnection(aconn);
124 #endif /* AFS_PTHREAD_ENV */
125 }
126
127
128 /*
129  * Iterate over all servers.  Callers pass in *ts which is used to track
130  * the current server.
131  * - Returns 1 if there are no more servers
132  * - Returns 0 with conn set to the connection for the current server if
133  *   it's up and current
134  */
135 static int
136 ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
137                          struct rx_connection **conn, afs_int32 *rcode,
138                          afs_int32 *okcalls, afs_int32 code)
139 {
140     if (!*ts) {
141         /* Initial call - start iterating over servers */
142         *ts = ubik_servers;
143         *conn = NULL;
144         *rcode = 0;
145         *okcalls = 0;
146     } else {
147         if (*conn) {
148             Quorum_EndIO(atrans, *conn);
149             *conn = NULL;
150             if (code) {         /* failure */
151                 *rcode = code;
152                 (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
153                 (*ts)->beaconSinceDown = 0;
154                 (*ts)->currentDB = 0;
155                 urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
156             } else {            /* success */
157                 if (!(*ts)->isClone)
158                     (*okcalls)++;       /* count up how many worked */
159                 if (aflags & CStampVersion) {
160                     (*ts)->version = atrans->dbase->version;
161                 }
162             }
163         }
164         *ts = (*ts)->next;
165     }
166     if (!(*ts))
167         return 1;
168     if (!(*ts)->up || !(*ts)->currentDB) {
169         (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
170         return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
171     }
172     *conn = Quorum_StartIO(atrans, *ts);
173     return 0;
174 }
175
176 static int
177 ContactQuorum_rcode(int okcalls, afs_int32 rcode)
178 {
179     /*
180      * return 0 if we successfully contacted a quorum, otherwise return error code.
181      * We don't have to contact ourselves (that was done locally)
182      */
183     if (okcalls + 1 >= ubik_quorum)
184         return 0;
185     else
186         return rcode;
187 }
188
189 /*!
190  * \brief Perform an operation at a quorum, handling error conditions.
191  * \return 0 if all worked and a quorum was contacted successfully
192  * \return otherwise mark failing server as down and return #UERROR
193  *
194  * \note If any server misses an update, we must wait #BIGTIME seconds before
195  * allowing the transaction to commit, to ensure that the missing and
196  * possibly still functioning server times out and stops handing out old
197  * data.  This is done in the commit code, where we wait for a server marked
198  * down to have stayed down for #BIGTIME seconds before we allow a transaction
199  * to commit.  A server that fails but comes back up won't give out old data
200  * because it is sent the sync count along with the beacon message that
201  * marks it as \b really up (\p beaconSinceDown).
202  */
203 afs_int32
204 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
205                           struct ubik_trans *atrans, int aflags)
206 {
207     struct ubik_server *ts = NULL;
208     afs_int32 code = 0, rcode, okcalls;
209     struct rx_connection *conn;
210     int done;
211
212     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
213     while (!done) {
214         if (conn)
215             code = (*proc)(conn, &atrans->tid);
216         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
217     }
218     return ContactQuorum_rcode(okcalls, rcode);
219 }
220
221
222 afs_int32
223 ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
224                         afs_int32 position, afs_int32 length, afs_int32 type)
225 {
226     struct ubik_server *ts = NULL;
227     afs_int32 code = 0, rcode, okcalls;
228     struct rx_connection *conn;
229     int done;
230
231     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
232     while (!done) {
233         if (conn)
234             code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
235         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
236     }
237     return ContactQuorum_rcode(okcalls, rcode);
238 }
239
240
241 afs_int32
242 ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
243                          afs_int32 file, afs_int32 position, bulkdata *data)
244 {
245     struct ubik_server *ts = NULL;
246     afs_int32 code = 0, rcode, okcalls;
247     struct rx_connection *conn;
248     int done;
249
250     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
251     while (!done) {
252         if (conn)
253             code = DISK_Write(conn, &atrans->tid, file, position, data);
254         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
255     }
256     return ContactQuorum_rcode(okcalls, rcode);
257 }
258
259
260 afs_int32
261 ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
262                             afs_int32 file, afs_int32 length)
263 {
264     struct ubik_server *ts = NULL;
265     afs_int32 code = 0, rcode, okcalls;
266     struct rx_connection *conn;
267     int done;
268
269     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
270     while (!done) {
271         if (conn)
272             code = DISK_Truncate(conn, &atrans->tid, file, length);
273         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
274     }
275     return ContactQuorum_rcode(okcalls, rcode);
276 }
277
278
279 afs_int32
280 ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
281                           iovec_wrt * io_vector, iovec_buf *io_buffer)
282 {
283     struct ubik_server *ts = NULL;
284     afs_int32 code = 0, rcode, okcalls;
285     struct rx_connection *conn;
286     int done;
287
288     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
289     while (!done) {
290         if (conn) {
291             code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
292             if ((code <= -450) && (code > -500)) {
293                 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
294                  * Un-bulk the entries and do individual DISK_Write calls
295                  * instead of DISK_WriteV.
296                  */
297                 struct ubik_iovec *iovec =
298                         (struct ubik_iovec *)io_vector->iovec_wrt_val;
299                 char *iobuf = (char *)io_buffer->iovec_buf_val;
300                 bulkdata tcbs;
301                 afs_int32 i, offset;
302
303                 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
304                     /* Sanity check for going off end of buffer */
305                     if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
306                         code = UINTERNAL;
307                         break;
308                     }
309                     tcbs.bulkdata_len = iovec[i].length;
310                     tcbs.bulkdata_val = &iobuf[offset];
311                     code = DISK_Write(conn, &atrans->tid, iovec[i].file,
312                            iovec[i].position, &tcbs);
313                     if (code)
314                         break;
315                     offset += iovec[i].length;
316                 }
317             }
318         }
319         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
320     }
321     return ContactQuorum_rcode(okcalls, rcode);
322 }
323
324
325 afs_int32
326 ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
327                               ubik_version *OldVersion,
328                               ubik_version *NewVersion)
329 {
330     struct ubik_server *ts = NULL;
331     afs_int32 code = 0, rcode, okcalls;
332     struct rx_connection *conn;
333     int done;
334
335     done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
336     while (!done) {
337         if (conn)
338             code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
339         done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
340     }
341     return ContactQuorum_rcode(okcalls, rcode);
342 }
343
344 #if defined(AFS_PTHREAD_ENV)
345 static int
346 ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
347     osi_Assert(pthread_attr_init(tattr) == 0);
348     osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
349     osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
350     return 0;
351 }
352 #endif
353
354 /*!
355  * \brief This routine initializes the ubik system for a set of servers.
356  * \return 0 for success, or an error code on failure.
357  * \param serverList set of servers specified; nServers gives the number of entries in this array.
358  * \param pathName provides an initial prefix used for naming storage files used by this system.
359  * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.
360  *
361  * \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
362  *
363  * \warning The host named by myHost should not also be listed in serverList.
364  *
365  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
366  */
367 int
368 ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
369                       struct afsconf_cell *info, char clones[],
370                       afs_uint32 serverList[], const char *pathName,
371                       struct ubik_dbase **dbase)
372 {
373     struct ubik_dbase *tdb;
374     afs_int32 code;
375 #ifdef AFS_PTHREAD_ENV
376     pthread_t rxServerThread;        /* pthread variables */
377     pthread_t ubeacon_InteractThread;
378     pthread_t urecovery_InteractThread;
379     pthread_attr_t rxServer_tattr;
380     pthread_attr_t ubeacon_Interact_tattr;
381     pthread_attr_t urecovery_Interact_tattr;
382 #else
383     PROCESS junk;
384     extern int rx_stackSize;
385 #endif
386
387     afs_int32 secIndex;
388     struct rx_securityClass *secClass;
389     int numClasses;
390
391     struct rx_service *tservice;
392
393     initialize_U_error_table();
394
395     tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
396     tdb->pathName = (char *)malloc(strlen(pathName) + 1);
397     strcpy(tdb->pathName, pathName);
398     tdb->activeTrans = (struct ubik_trans *)0;
399     memset(&tdb->version, 0, sizeof(struct ubik_version));
400     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
401 #ifdef AFS_PTHREAD_ENV
402     MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
403 #else
404     Lock_Init(&tdb->versionLock);
405 #endif
406     Lock_Init(&tdb->cache_lock);
407     tdb->flags = 0;
408     tdb->read = uphys_read;
409     tdb->write = uphys_write;
410     tdb->truncate = uphys_truncate;
411     tdb->open = uphys_invalidate;       /* this function isn't used any more */
412     tdb->sync = uphys_sync;
413     tdb->stat = uphys_stat;
414     tdb->getlabel = uphys_getlabel;
415     tdb->setlabel = uphys_setlabel;
416     tdb->getnfiles = uphys_getnfiles;
417     tdb->readers = 0;
418     tdb->tidCounter = tdb->writeTidCounter = 0;
419     *dbase = tdb;
420     ubik_dbase = tdb;           /* for now, only one db per server; can fix later when we have names for the other dbases */
421
422 #ifdef AFS_PTHREAD_ENV
423     CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
424     CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
425 #endif /* AFS_PTHREAD_ENV */
426
427     /* initialize RX */
428
429     /* the following call is idempotent so when/if it got called earlier,
430      * by whatever called us, it doesn't really matter -- klm */
431     code = rx_Init(myPort);
432     if (code < 0)
433         return code;
434
435     udisk_Init(ubik_nBuffers);
436     ulock_Init();
437
438     code = uvote_Init();
439     if (code)
440         return code;
441     code = urecovery_Initialize(tdb);
442     if (code)
443         return code;
444     if (info)
445         code = ubeacon_InitServerListByInfo(myHost, info, clones);
446     else
447         code = ubeacon_InitServerList(myHost, serverList);
448     if (code)
449         return code;
450
451     ubik_callPortal = myPort;
452     /* try to get an additional security object */
453     if (buildSecClassesProc == NULL) {
454         numClasses = 3;
455         ubik_sc = calloc(numClasses, sizeof(struct rx_securityClass *));
456         ubik_sc[0] = rxnull_NewServerSecurityObject();
457         if (ubik_SRXSecurityProc) {
458             code = (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock,
459                                             &secClass,
460                                             &secIndex);
461             if (code == 0) {
462                  ubik_sc[secIndex] = secClass;
463             }
464         }
465     } else {
466         (*buildSecClassesProc) (securityRock, &ubik_sc, &numClasses);
467     }
468     /* for backwards compat this should keep working as it does now
469        and not host bind */
470
471     tservice =
472         rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, numClasses,
473                       VOTE_ExecuteRequest);
474     if (tservice == (struct rx_service *)0) {
475         ubik_dprint("Could not create VOTE rx service!\n");
476         return -1;
477     }
478     rx_SetMinProcs(tservice, 2);
479     rx_SetMaxProcs(tservice, 3);
480
481     tservice =
482         rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, numClasses,
483                       DISK_ExecuteRequest);
484     if (tservice == (struct rx_service *)0) {
485         ubik_dprint("Could not create DISK rx service!\n");
486         return -1;
487     }
488     rx_SetMinProcs(tservice, 2);
489     rx_SetMaxProcs(tservice, 3);
490
491     /* start an rx_ServerProc to handle incoming RPC's in particular the
492      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
493      * the "steplock" problem in ubik initialization. Defect 11037.
494      */
495 #ifdef AFS_PTHREAD_ENV
496     ubik_thread_create(&rxServer_tattr, &rxServerThread, (void *)rx_ServerProc);
497 #else
498     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
499               NULL, "rx_ServerProc", &junk);
500 #endif
501
502     /* now start up async processes */
503 #ifdef AFS_PTHREAD_ENV
504     ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
505                 (void *)ubeacon_Interact);
506 #else
507     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
508                              LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
509                              &junk);
510     if (code)
511         return code;
512 #endif
513
514 #ifdef AFS_PTHREAD_ENV
515     ubik_thread_create(&urecovery_Interact_tattr, &urecovery_InteractThread,
516                 (void *)urecovery_Interact);
517     return 0;  /* is this correct?  - klm */
518 #else
519     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
520                              LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
521                              &junk);
522     return code;
523 #endif
524
525 }
526
527 /*!
528  * \see ubik_ServerInitCommon()
529  */
530 int
531 ubik_ServerInitByInfo(afs_uint32 myHost, short myPort,
532                       struct afsconf_cell *info, char clones[],
533                       const char *pathName, struct ubik_dbase **dbase)
534 {
535     afs_int32 code;
536
537     code =
538         ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
539                               dbase);
540     return code;
541 }
542
543 /*!
544  * \see ubik_ServerInitCommon()
545  */
546 int
547 ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
548                 const char *pathName, struct ubik_dbase **dbase)
549 {
550     afs_int32 code;
551
552     code =
553         ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
554                               serverList, pathName, dbase);
555     return code;
556 }
557
558 /*!
559  * \brief This routine begins a read or write transaction on the transaction
560  * identified by transPtr, in the dbase named by dbase.
561  *
562  * An open mode of ubik_READTRANS identifies this as a read transaction,
563  * while a mode of ubik_WRITETRANS identifies this as a write transaction.
564  * transPtr is set to the returned transaction control block.
565  * The readAny flag is set to 0 or 1 or 2 by the wrapper functions
566  * ubik_BeginTrans() or ubik_BeginTransReadAny() or
567  * ubik_BeginTransReadAnyWrite() below.
568  *
569  * \note We can only begin transaction when we have an up-to-date database.
570  */
571 static int
572 BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
573            struct ubik_trans **transPtr, int readAny)
574 {
575     struct ubik_trans *jt;
576     struct ubik_trans *tt;
577     afs_int32 code;
578
579     if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
580         /* it's not safe to use ubik_BeginTransReadAnyWrite without a
581          * cache-syncing function; fall back to ubik_BeginTransReadAny,
582          * which is safe but slower */
583         ubik_print("ubik_BeginTransReadAnyWrite called, but "
584                    "ubik_SyncWriterCacheProc not set; pretending "
585                    "ubik_BeginTransReadAny was called instead\n");
586         readAny = 1;
587     }
588
589     if ((transMode != UBIK_READTRANS) && readAny)
590         return UBADTYPE;
591     DBHOLD(dbase);
592     if (urecovery_AllBetter(dbase, readAny) == 0) {
593         DBRELE(dbase);
594         return UNOQUORUM;
595     }
596     /* otherwise we have a quorum, use it */
597
598     /* make sure that at most one write transaction occurs at any one time.  This
599      * has nothing to do with transaction locking; that's enforced by the lock package.  However,
600      * we can't even handle two non-conflicting writes, since our log and recovery modules
601      * don't know how to restore one without possibly picking up some data from the other. */
602     if (transMode == UBIK_WRITETRANS) {
603         /* if we're writing already, wait */
604         while (dbase->flags & DBWRITING) {
605 #ifdef AFS_PTHREAD_ENV
606             CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
607 #else
608             DBRELE(dbase);
609             LWP_WaitProcess(&dbase->flags);
610             DBHOLD(dbase);
611 #endif
612         }
613
614         if (!ubeacon_AmSyncSite()) {
615             DBRELE(dbase);
616             return UNOTSYNC;
617         }
618     }
619
620     /* create the transaction */
621     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
622     tt = jt;                    /* move to a register */
623     if (code || tt == (struct ubik_trans *)NULL) {
624         DBRELE(dbase);
625         return code;
626     }
627     if (readAny) {
628         tt->flags |= TRREADANY;
629         if (readAny > 1) {
630             tt->flags |= TRREADWRITE;
631         }
632     }
633     /* label trans and dbase with new tid */
634     tt->tid.epoch = ubik_epochTime;
635     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
636     tt->tid.counter = (dbase->tidCounter += 2);
637
638     if (transMode == UBIK_WRITETRANS) {
639         /* for a write trans, we have to keep track of the write tid counter too */
640         dbase->writeTidCounter = tt->tid.counter;
641
642         /* next try to start transaction on appropriate number of machines */
643         code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
644         if (code) {
645             /* we must abort the operation */
646             udisk_abort(tt);
647             ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
648             udisk_end(tt);
649             DBRELE(dbase);
650             return code;
651         }
652     }
653
654     *transPtr = tt;
655     DBRELE(dbase);
656     return 0;
657 }
658
659 /*!
660  * \see BeginTrans()
661  */
662 int
663 ubik_BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
664                 struct ubik_trans **transPtr)
665 {
666     return BeginTrans(dbase, transMode, transPtr, 0);
667 }
668
669 /*!
670  * \see BeginTrans()
671  */
672 int
673 ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
674                        struct ubik_trans **transPtr)
675 {
676     return BeginTrans(dbase, transMode, transPtr, 1);
677 }
678
679 /*!
680  * \see BeginTrans()
681  */
682 int
683 ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
684                             struct ubik_trans **transPtr)
685 {
686     return BeginTrans(dbase, transMode, transPtr, 2);
687 }
688
689 /*!
690  * \brief This routine ends a read or write transaction by aborting it.
691  */
692 int
693 ubik_AbortTrans(struct ubik_trans *transPtr)
694 {
695     afs_int32 code;
696     afs_int32 code2;
697     struct ubik_dbase *dbase;
698
699     dbase = transPtr->dbase;
700
701     if (transPtr->flags & TRCACHELOCKED) {
702         ReleaseReadLock(&dbase->cache_lock);
703         transPtr->flags &= ~TRCACHELOCKED;
704     }
705
706     ObtainWriteLock(&dbase->cache_lock);
707
708     DBHOLD(dbase);
709     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
710
711     ReleaseWriteLock(&dbase->cache_lock);
712
713     /* see if we're still up-to-date */
714     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
715         udisk_abort(transPtr);
716         udisk_end(transPtr);
717         DBRELE(dbase);
718         return UNOQUORUM;
719     }
720
721     if (transPtr->type == UBIK_READTRANS) {
722         code = udisk_abort(transPtr);
723         udisk_end(transPtr);
724         DBRELE(dbase);
725         return code;
726     }
727
728     /* below here, we know we're doing a write transaction */
729     if (!ubeacon_AmSyncSite()) {
730         udisk_abort(transPtr);
731         udisk_end(transPtr);
732         DBRELE(dbase);
733         return UNOTSYNC;
734     }
735
736     /* now it is safe to try remote abort */
737     code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
738     code2 = udisk_abort(transPtr);
739     udisk_end(transPtr);
740     DBRELE(dbase);
741     return (code ? code : code2);
742 }
743
744 static void
745 WritebackApplicationCache(struct ubik_dbase *dbase)
746 {
747     int code = 0;
748     if (ubik_SyncWriterCacheProc) {
749         code = ubik_SyncWriterCacheProc();
750     }
751     if (code) {
752         /* we failed to sync the local cache, so just invalidate the cache;
753          * we'll try to read the cache in again on the next read */
754         memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
755     } else {
756         memcpy(&dbase->cachedVersion, &dbase->version,
757                sizeof(dbase->cachedVersion));
758     }
759 }
760
761 /*!
762  * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
763  * \return an error code.
764  */
765 int
766 ubik_EndTrans(struct ubik_trans *transPtr)
767 {
768     afs_int32 code;
769     struct timeval tv;
770     afs_int32 realStart;
771     struct ubik_server *ts;
772     afs_int32 now;
773     int cachelocked = 0;
774     struct ubik_dbase *dbase;
775
776     if (transPtr->type == UBIK_WRITETRANS) {
777         code = ubik_Flush(transPtr);
778         if (code) {
779             ubik_AbortTrans(transPtr);
780             return (code);
781         }
782     }
783
784     dbase = transPtr->dbase;
785
786     if (transPtr->flags & TRCACHELOCKED) {
787         ReleaseReadLock(&dbase->cache_lock);
788         transPtr->flags &= ~TRCACHELOCKED;
789     }
790
791     if (transPtr->type != UBIK_READTRANS) {
792         /* must hold cache_lock before DBHOLD'ing */
793         ObtainWriteLock(&dbase->cache_lock);
794         cachelocked = 1;
795     }
796
797     DBHOLD(dbase);
798
799     /* give up if no longer current */
800     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
801         udisk_abort(transPtr);
802         udisk_end(transPtr);
803         DBRELE(dbase);
804         code = UNOQUORUM;
805         goto error;
806     }
807
808     if (transPtr->type == UBIK_READTRANS) {     /* reads are easy */
809         code = udisk_commit(transPtr);
810         if (code == 0)
811             goto success;       /* update cachedVersion correctly */
812         udisk_end(transPtr);
813         DBRELE(dbase);
814         goto error;
815     }
816
817     if (!ubeacon_AmSyncSite()) {        /* no longer sync site */
818         udisk_abort(transPtr);
819         udisk_end(transPtr);
820         DBRELE(dbase);
821         code = UNOTSYNC;
822         goto error;
823     }
824
825     /* now it is safe to do commit */
826     code = udisk_commit(transPtr);
827     if (code == 0) {
828         /* db data has been committed locally; update the local cache so
829          * readers can get at it */
830         WritebackApplicationCache(dbase);
831
832         ReleaseWriteLock(&dbase->cache_lock);
833
834         code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
835
836     } else {
837         memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
838         ReleaseWriteLock(&dbase->cache_lock);
839     }
840     cachelocked = 0;
841     if (code) {
842         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
843          * Note that we don't know if this transaction will eventually commit at this point.
844          * If it made it to a site that will be present in the next quorum, we win, otherwise
845          * we lose.  If we contact a majority of sites, then we won't be here: contacting
846          * a majority guarantees commit, since it guarantees that one dude will be a
847          * member of the next quorum. */
848         ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
849         udisk_end(transPtr);
850         DBRELE(dbase);
851         goto error;
852     }
853     /* before we can start sending unlock messages, we must wait until all servers
854      * that are possibly still functioning on the other side of a network partition
855      * have timed out.  Check the server structures, compute how long to wait, then
856      * start the unlocks */
857     realStart = FT_ApproxTime();
858     while (1) {
859         /* wait for all servers to time out */
860         code = 0;
861         now = FT_ApproxTime();
862         /* check if we're still sync site, the guy should either come up
863          * to us, or timeout.  Put safety check in anyway */
864         if (now - realStart > 10 * BIGTIME) {
865             ubik_stats.escapes++;
866             ubik_print("ubik escaping from commit wait\n");
867             break;
868         }
869         for (ts = ubik_servers; ts; ts = ts->next) {
870             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
871
872                 /* this guy could have some damaged data, wait for him */
873                 code = 1;
874                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
875                 tv.tv_usec = 0;
876
877 #ifdef AFS_PTHREAD_ENV
878                 /* we could release the dbase outside of the loop, but we do
879                  * it here, in the loop, to avoid an unnecessary RELE/HOLD
880                  * if all sites are up */
881                 DBRELE(dbase);
882                 select(0, 0, 0, 0, &tv);
883                 DBHOLD(dbase);
884 #else
885                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
886 #endif
887
888                 break;
889             }
890         }
891         if (code == 0)
892             break;              /* no down ones still pseudo-active */
893     }
894
895     /* finally, unlock all the dudes.  We can return success independent of the number of servers
896      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
897      * The transaction is committed anyway, since we succeeded in contacting a quorum
898      * at the start (when invoking the DiskCommit function).
899      */
900     ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
901
902   success:
903     udisk_end(transPtr);
904     /* don't update cachedVersion here; it should have been updated way back
905      * in ubik_CheckCache, and earlier in this function for writes */
906     DBRELE(dbase);
907     if (cachelocked) {
908         ReleaseWriteLock(&dbase->cache_lock);
909     }
910     return 0;
911
912   error:
913     if (!cachelocked) {
914         ObtainWriteLock(&dbase->cache_lock);
915     }
916     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
917     ReleaseWriteLock(&dbase->cache_lock);
918     return code;
919 }
920
921 /*!
922  * \brief This routine reads length bytes into buffer from the current position in the database.
923  *
924  * The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  A short read returns zero for an error code.
925  *
926  * \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
927  */
928 int
929 ubik_Read(struct ubik_trans *transPtr, void *buffer,
930           afs_int32 length)
931 {
932     afs_int32 code;
933
934     /* reads are easy to do: handle locally */
935     DBHOLD(transPtr->dbase);
936     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
937         DBRELE(transPtr->dbase);
938         return UNOQUORUM;
939     }
940
941     code =
942         udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
943                    length);
944     if (code == 0) {
945         transPtr->seekPos += length;
946     }
947     DBRELE(transPtr->dbase);
948     return code;
949 }
950
951 /*!
952  * \brief This routine will flush the io data in the iovec structures.
953  *
954  * It first flushes to the local disk and then uses ContactQuorum to write it
955  * to the other servers.
956  */
957 int
958 ubik_Flush(struct ubik_trans *transPtr)
959 {
960     afs_int32 code, error = 0;
961
962     if (transPtr->type != UBIK_WRITETRANS)
963         return UBADTYPE;
964     if (!transPtr->iovec_info.iovec_wrt_len
965         || !transPtr->iovec_info.iovec_wrt_val)
966         return 0;
967
968     DBHOLD(transPtr->dbase);
969     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
970         ERROR_EXIT(UNOQUORUM);
971     if (!ubeacon_AmSyncSite())  /* only sync site can write */
972         ERROR_EXIT(UNOTSYNC);
973
974     /* Update the rest of the servers in the quorum */
975     code =
976         ContactQuorum_DISK_WriteV(transPtr, 0, &transPtr->iovec_info,
977                                   &transPtr->iovec_data);
978     if (code) {
979         udisk_abort(transPtr);
980         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
981         transPtr->iovec_info.iovec_wrt_len = 0;
982         transPtr->iovec_data.iovec_buf_len = 0;
983         ERROR_EXIT(code);
984     }
985
986     /* Wrote the buffers out, so start at scratch again */
987     transPtr->iovec_info.iovec_wrt_len = 0;
988     transPtr->iovec_data.iovec_buf_len = 0;
989
990   error_exit:
991     DBRELE(transPtr->dbase);
992     return error;
993 }
994
995 int
996 ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
997            afs_int32 length)
998 {
999     struct ubik_iovec *iovec;
1000     afs_int32 code, error = 0;
1001     afs_int32 pos, len, size;
1002     char * buffer = (char *)vbuffer;
1003
1004     if (transPtr->type != UBIK_WRITETRANS)
1005         return UBADTYPE;
1006     if (!length)
1007         return 0;
1008
1009     if (length > IOVEC_MAXBUF) {
1010         for (pos = 0, len = length; len > 0; len -= size, pos += size) {
1011             size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
1012             code = ubik_Write(transPtr, buffer+pos, size);
1013             if (code)
1014                 return (code);
1015         }
1016         return 0;
1017     }
1018
1019     if (!transPtr->iovec_info.iovec_wrt_val) {
1020         transPtr->iovec_info.iovec_wrt_len = 0;
1021         transPtr->iovec_info.iovec_wrt_val =
1022             (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
1023                                         sizeof(struct ubik_iovec));
1024         transPtr->iovec_data.iovec_buf_len = 0;
1025         transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
1026         if (!transPtr->iovec_info.iovec_wrt_val
1027             || !transPtr->iovec_data.iovec_buf_val) {
1028             if (transPtr->iovec_info.iovec_wrt_val)
1029                 free(transPtr->iovec_info.iovec_wrt_val);
1030             transPtr->iovec_info.iovec_wrt_val = 0;
1031             if (transPtr->iovec_data.iovec_buf_val)
1032                 free(transPtr->iovec_data.iovec_buf_val);
1033             transPtr->iovec_data.iovec_buf_val = 0;
1034             return UNOMEM;
1035         }
1036     }
1037
1038     /* If this write won't fit in the structure, then flush it out and start anew */
1039     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
1040         || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
1041         code = ubik_Flush(transPtr);
1042         if (code)
1043             return (code);
1044     }
1045
1046     DBHOLD(transPtr->dbase);
1047     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1048         ERROR_EXIT(UNOQUORUM);
1049     if (!ubeacon_AmSyncSite())  /* only sync site can write */
1050         ERROR_EXIT(UNOTSYNC);
1051
1052     /* Write to the local disk */
1053     code =
1054         udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
1055                     length);
1056     if (code) {
1057         udisk_abort(transPtr);
1058         transPtr->iovec_info.iovec_wrt_len = 0;
1059         transPtr->iovec_data.iovec_buf_len = 0;
1060         DBRELE(transPtr->dbase);
1061         return (code);
1062     }
1063
1064     /* Collect writes for the other ubik servers (to be done in bulk) */
1065     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
1066     iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
1067     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
1068     iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
1069
1070     memcpy(&transPtr->iovec_data.
1071            iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
1072
1073     transPtr->iovec_info.iovec_wrt_len++;
1074     transPtr->iovec_data.iovec_buf_len += length;
1075     transPtr->seekPos += length;
1076
1077   error_exit:
1078     DBRELE(transPtr->dbase);
1079     return error;
1080 }
1081
1082 /*!
1083  * \brief This sets the file pointer associated with the current transaction
1084  * to the appropriate file and byte position.
1085  *
1086  * Unlike Unix files, a transaction is labelled by both a file number \p fileid
1087  * and a byte position relative to the specified file \p position.
1088  */
1089 int
1090 ubik_Seek(struct ubik_trans *transPtr, afs_int32 fileid,
1091           afs_int32 position)
1092 {
1093     afs_int32 code;
1094
1095     DBHOLD(transPtr->dbase);
1096     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
1097         code = UNOQUORUM;
1098     } else {
1099         transPtr->seekFile = fileid;
1100         transPtr->seekPos = position;
1101         code = 0;
1102     }
1103     DBRELE(transPtr->dbase);
1104     return code;
1105 }
1106
1107 /*!
1108  * \brief This call returns the file pointer associated with the specified
1109  * transaction in \p fileid and \p position.
1110  */
1111 int
1112 ubik_Tell(struct ubik_trans *transPtr, afs_int32 * fileid,
1113           afs_int32 * position)
1114 {
1115     DBHOLD(transPtr->dbase);
1116     *fileid = transPtr->seekFile;
1117     *position = transPtr->seekPos;
1118     DBRELE(transPtr->dbase);
1119     return 0;
1120 }
1121
1122 /*!
1123  * \brief This sets the file size for the currently-selected file to \p length
1124  * bytes, if length is less than the file's current size.
1125  */
1126 int
1127 ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
1128 {
1129     afs_int32 code, error = 0;
1130
1131     /* Will also catch if not UBIK_WRITETRANS */
1132     code = ubik_Flush(transPtr);
1133     if (code)
1134         return (code);
1135
1136     DBHOLD(transPtr->dbase);
1137     /* first, check that quorum is still good, and that dbase is up-to-date */
1138     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1139         ERROR_EXIT(UNOQUORUM);
1140     if (!ubeacon_AmSyncSite())
1141         ERROR_EXIT(UNOTSYNC);
1142
1143     /* now do the operation locally, and propagate it out */
1144     code = udisk_truncate(transPtr, transPtr->seekFile, length);
1145     if (!code) {
1146         code =
1147             ContactQuorum_DISK_Truncate(transPtr, 0, transPtr->seekFile,
1148                                         length);
1149     }
1150     if (code) {
1151         /* we must abort the operation */
1152         udisk_abort(transPtr);
1153         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
1154         ERROR_EXIT(code);
1155     }
1156
1157   error_exit:
1158     DBRELE(transPtr->dbase);
1159     return error;
1160 }
1161
1162 /*!
1163  * \brief set a lock; all locks are released on transaction end (commit/abort)
1164  */
1165 int
1166 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
1167              int atype)
1168 {
1169     afs_int32 code = 0, error = 0;
1170
1171     if (atype == LOCKWRITE) {
1172         if (atrans->type == UBIK_READTRANS)
1173             return UBADTYPE;
1174         code = ubik_Flush(atrans);
1175         if (code)
1176             return (code);
1177     }
1178
1179     DBHOLD(atrans->dbase);
1180     if (atype == LOCKREAD) {
1181         code = ulock_getLock(atrans, atype, 1);
1182         if (code)
1183             ERROR_EXIT(code);
1184     } else {
1185         /* first, check that quorum is still good, and that dbase is up-to-date */
1186         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
1187             ERROR_EXIT(UNOQUORUM);
1188         if (!ubeacon_AmSyncSite())
1189             ERROR_EXIT(UNOTSYNC);
1190
1191         /* now do the operation locally, and propagate it out */
1192         code = ulock_getLock(atrans, atype, 1);
1193         if (code == 0) {
1194             code = ContactQuorum_DISK_Lock(atrans, 0, 0, 1 /*unused */ ,
1195                                            1 /*unused */ , LOCKWRITE);
1196         }
1197         if (code) {
1198             /* we must abort the operation */
1199             udisk_abort(atrans);
1200             ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
1201             ERROR_EXIT(code);
1202         }
1203     }
1204
1205   error_exit:
1206     DBRELE(atrans->dbase);
1207     return error;
1208 }
1209
1210 /*!
1211  * \brief utility to wait for a version # to change
1212  */
1213 int
1214 ubik_WaitVersion(struct ubik_dbase *adatabase,
1215                  struct ubik_version *aversion)
1216 {
1217     DBHOLD(adatabase);
1218     while (1) {
1219         /* wait until version # changes, and then return */
1220         if (vcmp(*aversion, adatabase->version) != 0) {
1221             DBRELE(adatabase);
1222             return 0;
1223         }
1224 #ifdef AFS_PTHREAD_ENV
1225         CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
1226 #else
1227         DBRELE(adatabase);
1228         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
1229         DBHOLD(adatabase);
1230 #endif
1231     }
1232 }
1233
1234 /*!
1235  * \brief utility to get the version of the dbase a transaction is dealing with
1236  */
1237 int
1238 ubik_GetVersion(struct ubik_trans *atrans,
1239                 struct ubik_version *avers)
1240 {
1241     *avers = atrans->dbase->version;
1242     return 0;
1243 }
1244
1245 /*!
1246  * \brief Facility to simplify database caching.
1247  * \return zero if last trans was done on the local server and was successful.
1248  * \return -1 means bad (NULL) argument.
1249  *
1250  * If return value is non-zero and the caller is a server caching part of the
1251  * Ubik database, it should invalidate that cache.
1252  */
1253 static int
1254 ubik_CacheUpdate(struct ubik_trans *atrans)
1255 {
1256     if (!(atrans && atrans->dbase))
1257         return -1;
1258     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
1259 }
1260
1261 /**
1262  * check and possibly update cache of ubik db.
1263  *
1264  * If the version of the cached db data is out of date, this calls (*check) to
1265  * update the cache. If (*check) returns success, we update the version of the
1266  * cached db data.
1267  *
1268  * Checking the version of the cached db data is done under a read lock;
1269  * updating the cache (and thus calling (*check)) is done under a write lock
1270  * so is guaranteed not to interfere with another thread's (*check). On
1271  * successful return, a read lock on the cached db data is obtained, which
1272  * will be released by ubik_EndTrans or ubik_AbortTrans.
1273  *
1274  * @param[in] atrans ubik transaction
1275  * @param[in] check  function to call to check/update cache
1276  * @param[in] rock   rock to pass to *check
1277  *
1278  * @return operation status
1279  *   @retval 0       success
1280  *   @retval nonzero error; cachedVersion not updated
1281  *
1282  * @post On success, application cache is read-locked, and cache data is
1283  *       up-to-date
1284  */
1285 int
1286 ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
1287 {
1288     int ret = 0;
1289
1290     if (!(atrans && atrans->dbase))
1291         return -1;
1292
1293     ObtainReadLock(&atrans->dbase->cache_lock);
1294
1295     while (ubik_CacheUpdate(atrans) != 0) {
1296
1297         ReleaseReadLock(&atrans->dbase->cache_lock);
1298         ObtainSharedLock(&atrans->dbase->cache_lock);
1299
1300         if (ubik_CacheUpdate(atrans) != 0) {
1301
1302             BoostSharedLock(&atrans->dbase->cache_lock);
1303
1304             ret = (*cbf) (atrans, rock);
1305             if (ret == 0) {
1306                 memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
1307                        sizeof(atrans->dbase->cachedVersion));
1308             }
1309         }
1310
1311         /* It would be nice if we could convert from a shared lock to a read
1312          * lock... instead, just release the shared and acquire the read */
1313         ReleaseSharedLock(&atrans->dbase->cache_lock);
1314
1315         if (ret) {
1316             /* if we have an error, don't retry, and don't hold any locks */
1317             return ret;
1318         }
1319
1320         ObtainReadLock(&atrans->dbase->cache_lock);
1321     }
1322
1323     atrans->flags |= TRCACHELOCKED;
1324
1325     return 0;
1326 }
1327
1328 /*!
1329  * "Who said anything about panicking?" snapped Arthur.
1330  * "This is still just the culture shock. You wait till I've settled down
1331  * into the situation and found my bearings. \em Then I'll start panicking!"
1332  * --Authur Dent
1333  *
1334  * \returns There is no return from panic.
1335  */
1336 void
1337 panic(char *format, ...)
1338 {
1339     va_list ap;
1340
1341     va_start(ap, format);
1342     ubik_print("Ubik PANIC: ");
1343     ubik_vprint(format, ap);
1344     va_end(ap);
1345
1346     abort();
1347     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
1348     exit(1);                    /* never know, though  */
1349 }
1350
1351 /*!
1352  * This function takes an IP addresses as its parameter. It returns the
1353  * the primary IP address that is on the host passed in, or 0 if not found.
1354  */
1355 afs_uint32
1356 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
1357 {
1358     struct ubik_server *ts;
1359     int j;
1360
1361     for (ts = ubik_servers; ts; ts = ts->next)
1362         for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
1363             if (ts->addr[j] == addr)
1364                 return ts->addr[0];     /* net byte order */
1365     return 0;                   /* if not in server database, return error */
1366 }
1367
1368 int
1369 ubik_CheckAuth(struct rx_call *acall)
1370 {
1371     if (checkSecurityProc)
1372         return (*checkSecurityProc) (securityRock, acall);
1373     else if (ubik_CheckRXSecurityProc) {
1374         return (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
1375     } else
1376         return 0;
1377 }
1378
1379 void
1380 ubik_SetServerSecurityProcs(void (*buildproc) (void *,
1381                                                struct rx_securityClass ***,
1382                                                afs_int32 *),
1383                             int (*checkproc) (void *, struct rx_call *),
1384                             void *rock)
1385 {
1386     buildSecClassesProc = buildproc;
1387     checkSecurityProc = checkproc;
1388     securityRock = rock;
1389 }