ubik: Allow servers to have more than 3 seclayers
[openafs.git] / src / ubik / ubik.c
index 68b127f..4218365 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Copyright 2000, International Business Machines Corporation and others.
  * All Rights Reserved.
- * 
+ *
  * This software has been released under the terms of the IBM Public
  * License.  For details, see the LICENSE file in the top-level source
  * directory or online at http://www.openafs.org/dl/license10.html
 #include <afsconfig.h>
 #include <afs/param.h>
 
-RCSID
-    ("$Header$");
+#include <roken.h>
 
 #include <sys/types.h>
+#include <string.h>
+#include <stdarg.h>
+#include <time.h>
+
 #ifdef AFS_NT40_ENV
 #include <winsock2.h>
 #else
@@ -21,9 +24,8 @@ RCSID
 #include <netinet/in.h>
 #include <sys/param.h>
 #endif
-#include <time.h>
+
 #include <lock.h>
-#include <string.h>
 #include <rx/xdr.h>
 #include <rx/rx.h>
 #include <afs/cellconfig.h>
@@ -34,44 +36,46 @@ RCSID
 
 #include <lwp.h>   /* temporary hack by klm */
 
-#define ERROR_EXIT(code) {error=(code); goto error_exit;}
-
-/*  This system is organized in a hierarchical set of related modules.  Modules
-    at one level can only call modules at the same level or below.
-    
-    At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
-    operating system primitives.
-    
-    At the next level (1) we have
-
-       VOTER--The module responsible for casting votes when asked.  It is also
-       responsible for determining whether this server should try to become
-       a synchronization site.
-       
-       BEACONER--The module responsible for sending keep-alives out when a
-       server is actually the sync site, or trying to become a sync site.
-       
-       DISK--The module responsible for representing atomic transactions
-       on the local disk.  It maintains a new-value only log.
-       
-       LOCK--The module responsible for locking byte ranges in the database file.
-       
-    At the next level (2) we have
-         
-       RECOVERY--The module responsible for ensuring that all members of a quorum
-       have the same up-to-date database after a new synchronization site is
-       elected.  This module runs only on the synchronization site.
-       
-    At the next level (3) we have
-    
-       REMOTE--The module responsible for interpreting requests from the sync
-       site and applying them to the database, after obtaining the appropriate
-       locks.
-       
-    At the next level (4) we have
-    
-       UBIK--The module users call to perform operations on the database.
-*/
+#define ERROR_EXIT(code) do { \
+    error = (code); \
+    goto error_exit; \
+} while (0)
+
+/*!
+ * \file
+ * This system is organized in a hierarchical set of related modules.  Modules
+ * at one level can only call modules at the same level or below.
+ *
+ * At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
+ * operating system primitives.
+ *
+ * At the next level (1) we have
+ *
+ * \li VOTER--The module responsible for casting votes when asked.  It is also
+ * responsible for determining whether this server should try to become
+ * a synchronization site.
+ * \li BEACONER--The module responsible for sending keep-alives out when a
+ * server is actually the sync site, or trying to become a sync site.
+ * \li DISK--The module responsible for representing atomic transactions
+ * on the local disk.  It maintains a new-value only log.
+ * \li LOCK--The module responsible for locking byte ranges in the database file.
+ *
+ * At the next level (2) we have
+ *
+ * \li RECOVERY--The module responsible for ensuring that all members of a quorum
+ * have the same up-to-date database after a new synchronization site is
+ * elected.  This module runs only on the synchronization site.
+ *
+ * At the next level (3) we have
+ *
+ * \li REMOTE--The module responsible for interpreting requests from the sync
+ * site and applying them to the database, after obtaining the appropriate
+ * locks.
+ *
+ * At the next level (4) we have
+ *
+ * \li UBIK--The module users call to perform operations on the database.
+ */
 
 
 /* some globals */
@@ -81,115 +85,307 @@ struct ubik_stats ubik_stats;
 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
 afs_int32 ubik_epochTime = 0;
 afs_int32 urecovery_state = 0;
-int (*ubik_SRXSecurityProc) ();
-char *ubik_SRXSecurityRock;
+int (*ubik_SyncWriterCacheProc) (void);
 struct ubik_server *ubik_servers;
 short ubik_callPortal;
 
-static int BeginTrans();
+/* These global variables were used to control the server security layers.
+ * They are retained for backwards compatibility with legacy callers.
+ *
+ * The ubik_SetServerSecurityProcs() interface should be used instead.
+ */
+
+int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
+void *ubik_SRXSecurityRock;
+int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
+void *ubik_CheckRXSecurityRock;
 
-struct rx_securityClass *ubik_sc[3];
 
-/* perform an operation at a quorum, handling error conditions.  return 0 if
-    all worked, otherwise mark failing server as down and return UERROR
 
-    Note that if any server misses an update, we must wait BIGTIME seconds before
-    allowing the transaction to commit, to ensure that the missing and possibly still
-    functioning server times out and stop handing out old data.  This is done in the commit
-    code, where we wait for a server marked down to have stayed down for BIGTIME seconds
-    before we allow a transaction to commit.  A server that fails but comes back up won't give
-    out old data because it is sent the sync count along with the beacon message that
-    marks it as *really* up (beaconSinceDown).
-*/
+static int BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
+                     struct ubik_trans **transPtr, int readAny);
+
+static struct rx_securityClass **ubik_sc = NULL;
+static void (*buildSecClassesProc)(void *, struct rx_securityClass ***,
+                                  afs_int32 *) = NULL;
+static int (*checkSecurityProc)(void *, struct rx_call *) = NULL;
+static void *securityRock = NULL;
+
 #define        CStampVersion       1   /* meaning set ts->version */
-afs_int32
-ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
-             aparm5)
-     int (*aproc) ();
-     int aflags;
-     register struct ubik_trans *atrans;
-     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
-{
-    register struct ubik_server *ts;
-    register afs_int32 code;
-    afs_int32 rcode, okcalls;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-       code =
-           (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
-                     aparm3, aparm4, aparm5);
-       if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
-           /* An RPC interface mismatch (as defined in comerr/error_msg.c).
-            * Un-bulk the entries and do individual DISK_Write calls
-            * instead of DISK_WriteV.
-            */
-           iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
-           iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
-           struct ubik_iovec *iovec =
-               (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
-           char *iobuf = (char *)iovec_dataP->iovec_buf_val;
-           bulkdata tcbs;
-           afs_int32 i, offset;
-
-           for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
-               /* Sanity check for going off end of buffer */
-               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
-                   code = UINTERNAL;
-                   break;
+
+static_inline struct rx_connection *
+Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
+{
+    struct rx_connection *conn;
+
+    conn = as->disk_rxcid;
+
+#ifdef AFS_PTHREAD_ENV
+    rx_GetConnection(conn);
+    DBRELE(atrans->dbase);
+#endif /* AFS_PTHREAD_ENV */
+
+    return conn;
+}
+
+static_inline void
+Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
+{
+#ifdef AFS_PTHREAD_ENV
+    DBHOLD(atrans->dbase);
+    rx_PutConnection(aconn);
+#endif /* AFS_PTHREAD_ENV */
+}
+
+
+/*
+ * Iterate over all servers.  Callers pass in *ts which is used to track
+ * the current server.
+ * - Returns 1 if there are no more servers
+ * - Returns 0 with conn set to the connection for the current server if
+ *   it's up and current
+ */
+static int
+ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
+                        struct rx_connection **conn, afs_int32 *rcode,
+                        afs_int32 *okcalls, afs_int32 code)
+{
+    if (!*ts) {
+       /* Initial call - start iterating over servers */
+       *ts = ubik_servers;
+       *conn = NULL;
+       *rcode = 0;
+       *okcalls = 0;
+    } else {
+       if (*conn) {
+           Quorum_EndIO(atrans, *conn);
+           *conn = NULL;
+           if (code) {         /* failure */
+               *rcode = code;
+               (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
+               (*ts)->beaconSinceDown = 0;
+               (*ts)->currentDB = 0;
+               urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
+           } else {            /* success */
+               if (!(*ts)->isClone)
+                   (*okcalls)++;       /* count up how many worked */
+               if (aflags & CStampVersion) {
+                   (*ts)->version = atrans->dbase->version;
                }
-               tcbs.bulkdata_len = iovec[i].length;
-               tcbs.bulkdata_val = &iobuf[offset];
-               code =
-                   DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
-                              iovec[i].position, &tcbs);
-               if (code)
-                   break;
-
-               offset += iovec[i].length;
-           }
-       }
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
            }
        }
+       *ts = (*ts)->next;
+    }
+    if (!(*ts))
+       return 1;
+    if (!(*ts)->up || !(*ts)->currentDB) {
+       (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
+       return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
+    *conn = Quorum_StartIO(atrans, *ts);
+    return 0;
+}
+
+static int
+ContactQuorum_rcode(int okcalls, afs_int32 rcode)
+{
+    /*
+     * return 0 if we successfully contacted a quorum, otherwise return error code.
+     * We don't have to contact ourselves (that was done locally)
+     */
     if (okcalls + 1 >= ubik_quorum)
        return 0;
     else
        return rcode;
 }
 
-/* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
+/*!
+ * \brief Perform an operation at a quorum, handling error conditions.
+ * \return 0 if all worked and a quorum was contacted successfully
+ * \return otherwise mark failing server as down and return #UERROR
+ *
+ * \note If any server misses an update, we must wait #BIGTIME seconds before
+ * allowing the transaction to commit, to ensure that the missing and
+ * possibly still functioning server times out and stops handing out old
+ * data.  This is done in the commit code, where we wait for a server marked
+ * down to have stayed down for #BIGTIME seconds before we allow a transaction
+ * to commit.  A server that fails but comes back up won't give out old data
+ * because it is sent the sync count along with the beacon message that
+ * marks it as \b really up (\p beaconSinceDown).
+ */
+afs_int32
+ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
+                         struct ubik_trans *atrans, int aflags)
+{
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = (*proc)(conn, &atrans->tid);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    }
+    return ContactQuorum_rcode(okcalls, rcode);
+}
+
+
+afs_int32
+ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
+                       afs_int32 position, afs_int32 length, afs_int32 type)
+{
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    }
+    return ContactQuorum_rcode(okcalls, rcode);
+}
+
+
+afs_int32
+ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
+                        afs_int32 file, afs_int32 position, bulkdata *data)
+{
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Write(conn, &atrans->tid, file, position, data);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    }
+    return ContactQuorum_rcode(okcalls, rcode);
+}
+
+
+afs_int32
+ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
+                           afs_int32 file, afs_int32 length)
+{
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Truncate(conn, &atrans->tid, file, length);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    }
+    return ContactQuorum_rcode(okcalls, rcode);
+}
+
 
-    Note that the host named by myHost should not also be listed in serverList.
-*/
+afs_int32
+ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
+                         iovec_wrt * io_vector, iovec_buf *io_buffer)
+{
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn) {
+           code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+           if ((code <= -450) && (code > -500)) {
+               /* An RPC interface mismatch (as defined in comerr/error_msg.c).
+                * Un-bulk the entries and do individual DISK_Write calls
+                * instead of DISK_WriteV.
+                */
+               struct ubik_iovec *iovec =
+                       (struct ubik_iovec *)io_vector->iovec_wrt_val;
+               char *iobuf = (char *)io_buffer->iovec_buf_val;
+               bulkdata tcbs;
+               afs_int32 i, offset;
+
+               for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
+                   /* Sanity check for going off end of buffer */
+                   if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
+                       code = UINTERNAL;
+                       break;
+                   }
+                   tcbs.bulkdata_len = iovec[i].length;
+                   tcbs.bulkdata_val = &iobuf[offset];
+                   code = DISK_Write(conn, &atrans->tid, iovec[i].file,
+                          iovec[i].position, &tcbs);
+                   if (code)
+                       break;
+                   offset += iovec[i].length;
+               }
+           }
+       }
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    }
+    return ContactQuorum_rcode(okcalls, rcode);
+}
 
+
+afs_int32
+ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
+                             ubik_version *OldVersion,
+                             ubik_version *NewVersion)
+{
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    }
+    return ContactQuorum_rcode(okcalls, rcode);
+}
+
+#if defined(AFS_PTHREAD_ENV)
+static int
+ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
+    osi_Assert(pthread_attr_init(tattr) == 0);
+    osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
+    osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
+    return 0;
+}
+#endif
+
+/*!
+ * \brief This routine initializes the ubik system for a set of servers.
+ * \return 0 for success, or an error code on failure.
+ * \param serverList set of servers specified; nServers gives the number of entries in this array.
+ * \param pathName provides an initial prefix used for naming storage files used by this system.
+ * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.
+ *
+ * \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
+ *
+ * \warning The host named by myHost should not also be listed in serverList.
+ *
+ * \see ubik_ServerInit(), ubik_ServerInitByInfo()
+ */
 int
-ubik_ServerInitCommon(afs_int32 myHost, short myPort,
+ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
                      struct afsconf_cell *info, char clones[],
-                     afs_int32 serverList[], char *pathName,
+                     afs_uint32 serverList[], const char *pathName,
                      struct ubik_dbase **dbase)
 {
-    register struct ubik_dbase *tdb;
-    register afs_int32 code;
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+    struct ubik_dbase *tdb;
+    afs_int32 code;
+#ifdef AFS_PTHREAD_ENV
     pthread_t rxServerThread;        /* pthread variables */
     pthread_t ubeacon_InteractThread;
     pthread_t urecovery_InteractThread;
@@ -198,14 +394,14 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     pthread_attr_t urecovery_Interact_tattr;
 #else
     PROCESS junk;
+    extern int rx_stackSize;
 #endif
 
     afs_int32 secIndex;
     struct rx_securityClass *secClass;
+    int numClasses;
 
     struct rx_service *tservice;
-    extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
-    extern int rx_stackSize;
 
     initialize_U_error_table();
 
@@ -215,7 +411,12 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     tdb->activeTrans = (struct ubik_trans *)0;
     memset(&tdb->version, 0, sizeof(struct ubik_version));
     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
+#ifdef AFS_PTHREAD_ENV
+    MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
+#else
     Lock_Init(&tdb->versionLock);
+#endif
+    Lock_Init(&tdb->cache_lock);
     tdb->flags = 0;
     tdb->read = uphys_read;
     tdb->write = uphys_write;
@@ -231,6 +432,11 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     *dbase = tdb;
     ubik_dbase = tdb;          /* for now, only one db per server; can fix later when we have names for the other dbases */
 
+#ifdef AFS_PTHREAD_ENV
+    CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
+    CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
+#endif /* AFS_PTHREAD_ENV */
+
     /* initialize RX */
 
     /* the following call is idempotent so when/if it got called earlier,
@@ -239,35 +445,44 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     if (code < 0)
        return code;
 
+    udisk_Init(ubik_nBuffers);
+    ulock_Init();
+
+    code = uvote_Init();
+    if (code)
+       return code;
+    code = urecovery_Initialize(tdb);
+    if (code)
+       return code;
+    if (info)
+       code = ubeacon_InitServerListByInfo(myHost, info, clones);
+    else
+       code = ubeacon_InitServerList(myHost, serverList);
+    if (code)
+       return code;
+
     ubik_callPortal = myPort;
     /* try to get an additional security object */
-    ubik_sc[0] = rxnull_NewServerSecurityObject();
-    ubik_sc[1] = 0;
-    ubik_sc[2] = 0;
-    if (ubik_SRXSecurityProc) {
-       code =
-           (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
-                                    &secIndex);
-       if (code == 0) {
-           ubik_sc[secIndex] = secClass;
+    if (buildSecClassesProc == NULL) {
+       numClasses = 3;
+       ubik_sc = calloc(numClasses, sizeof(struct rx_securityClass *));
+       ubik_sc[0] = rxnull_NewServerSecurityObject();
+       if (ubik_SRXSecurityProc) {
+           code = (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock,
+                                           &secClass,
+                                           &secIndex);
+           if (code == 0) {
+                ubik_sc[secIndex] = secClass;
+           }
        }
+    } else {
+        (*buildSecClassesProc) (securityRock, &ubik_sc, &numClasses);
     }
-    /* for backwards compat this should keep working as it does now 
+    /* for backwards compat this should keep working as it does now
        and not host bind */
-#if 0
-    /* This really needs to be up above, where I have put it.  It works
-     * here when we're non-pthreaded, but the code above, when using
-     * pthreads may (and almost certainly does) end up calling on a
-     * pthread resource which gets initialized by rx_Init.  The end
-     * result is that an assert fails and the program dies. -- klm
-     */
-    code = rx_Init(myPort);
-    if (code < 0)
-       return code;
-#endif
 
     tservice =
-       rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
+       rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, numClasses,
                      VOTE_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
        ubik_dprint("Could not create VOTE rx service!\n");
@@ -277,7 +492,7 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     rx_SetMaxProcs(tservice, 3);
 
     tservice =
-       rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
+       rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, numClasses,
                      DISK_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
        ubik_dprint("Could not create DISK rx service!\n");
@@ -286,46 +501,21 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     rx_SetMinProcs(tservice, 2);
     rx_SetMaxProcs(tservice, 3);
 
-    /* start an rx_ServerProc to handle incoming RPC's in particular the 
+    /* start an rx_ServerProc to handle incoming RPC's in particular the
      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
      * the "steplock" problem in ubik initialization. Defect 11037.
      */
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
-/* do assert stuff */
-    assert(pthread_attr_init(&rxServer_tattr) == 0);
-    assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
-
-    assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
+#ifdef AFS_PTHREAD_ENV
+    ubik_thread_create(&rxServer_tattr, &rxServerThread, (void *)rx_ServerProc);
 #else
     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
-              (void *)0, "rx_ServerProc", &junk);
+              NULL, "rx_ServerProc", &junk);
 #endif
 
-    /* do basic initialization */
-    code = uvote_Init();
-    if (code)
-       return code;
-    code = urecovery_Initialize(tdb);
-    if (code)
-       return code;
-    if (info)
-       code = ubeacon_InitServerListByInfo(myHost, info, clones);
-    else
-       code = ubeacon_InitServerList(myHost, serverList);
-    if (code)
-       return code;
-
     /* now start up async processes */
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
-/* do assert stuff */
-    assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
-    assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
-    /*  need another attr set here for priority???  - klm */
-
-    assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
-           (void *)ubeacon_Interact, NULL) == 0);
+#ifdef AFS_PTHREAD_ENV
+    ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
+               (void *)ubeacon_Interact);
 #else
     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
                             LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
@@ -334,18 +524,11 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
        return code;
 #endif
 
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
-/* do assert stuff */
-    assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
-    assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
-    /*  need another attr set here for priority???  - klm */
-
-    assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
-           (void *)urecovery_Interact, NULL) == 0);
-
+#ifdef AFS_PTHREAD_ENV
+    ubik_thread_create(&urecovery_Interact_tattr, &urecovery_InteractThread,
+               (void *)urecovery_Interact);
     return 0;  /* is this correct?  - klm */
-#else  
+#else
     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
                             LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
                             &junk);
@@ -354,10 +537,13 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
 
 }
 
+/*!
+ * \see ubik_ServerInitCommon()
+ */
 int
-ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
+ubik_ServerInitByInfo(afs_uint32 myHost, short myPort,
                      struct afsconf_cell *info, char clones[],
-                     char *pathName, struct ubik_dbase **dbase)
+                     const char *pathName, struct ubik_dbase **dbase)
 {
     afs_int32 code;
 
@@ -367,9 +553,12 @@ ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
     return code;
 }
 
+/*!
+ * \see ubik_ServerInitCommon()
+ */
 int
-ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
-               char *pathName, struct ubik_dbase **dbase)
+ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
+               const char *pathName, struct ubik_dbase **dbase)
 {
     afs_int32 code;
 
@@ -379,62 +568,40 @@ ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
     return code;
 }
 
-/*  This routine begins a read or write transaction on the transaction
-    identified by transPtr, in the dbase named by dbase.  An open mode of
-    ubik_READTRANS identifies this as a read transaction, while a mode of
-    ubik_WRITETRANS identifies this as a write transaction.  transPtr 
-    is set to the returned transaction control block. The readAny flag is
-    set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
-    ubik_BeginTransReadAny() below.
-
-    We can only begin transaction when we have an up-to-date database.
-*/
-
+/*!
+ * \brief This routine begins a read or write transaction on the transaction
+ * identified by transPtr, in the dbase named by dbase.
+ *
+ * An open mode of ubik_READTRANS identifies this as a read transaction,
+ * while a mode of ubik_WRITETRANS identifies this as a write transaction.
+ * transPtr is set to the returned transaction control block.
+ * The readAny flag is set to 0 or 1 or 2 by the wrapper functions
+ * ubik_BeginTrans() or ubik_BeginTransReadAny() or
+ * ubik_BeginTransReadAnyWrite() below.
+ *
+ * \note We can only begin transaction when we have an up-to-date database.
+ */
 static int
-BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
           struct ubik_trans **transPtr, int readAny)
 {
     struct ubik_trans *jt;
-    register struct ubik_trans *tt;
-    register afs_int32 code;
-#if defined(UBIK_PAUSE)
-    int count;
-#endif /* UBIK_PAUSE */
+    struct ubik_trans *tt;
+    afs_int32 code;
+
+    if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
+       /* it's not safe to use ubik_BeginTransReadAnyWrite without a
+        * cache-syncing function; fall back to ubik_BeginTransReadAny,
+        * which is safe but slower */
+       ubik_print("ubik_BeginTransReadAnyWrite called, but "
+                  "ubik_SyncWriterCacheProc not set; pretending "
+                  "ubik_BeginTransReadAny was called instead\n");
+       readAny = 1;
+    }
 
     if ((transMode != UBIK_READTRANS) && readAny)
        return UBADTYPE;
     DBHOLD(dbase);
-#if defined(UBIK_PAUSE)
-    /* if we're polling the slave sites, wait until the returns
-     *  are all in.  Otherwise, the urecovery_CheckTid call may
-     *  glitch us. 
-     */
-    if (transMode == UBIK_WRITETRANS)
-       for (count = 75; dbase->flags & DBVOTING; --count) {
-           DBRELE(dbase);
-#ifdef GRAND_PAUSE_DEBUGGING
-           if (count == 75)
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-           else
-#endif
-           if (count <= 0) {
-#if 1
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-#endif
-               return UNOQUORUM;       /* a white lie */
-           }
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
-           sleep(2);
-#else
-           IOMGR_Sleep(2);
-#endif
-           DBHOLD(dbase);
-       }
-#endif /* UBIK_PAUSE */
     if (urecovery_AllBetter(dbase, readAny) == 0) {
        DBRELE(dbase);
        return UNOQUORUM;
@@ -448,16 +615,15 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
     if (transMode == UBIK_WRITETRANS) {
        /* if we're writing already, wait */
        while (dbase->flags & DBWRITING) {
-           DBRELE(dbase);
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
-           assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
-           assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
-           assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
+#ifdef AFS_PTHREAD_ENV
+           CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
 #else
+           DBRELE(dbase);
            LWP_WaitProcess(&dbase->flags);
-#endif
            DBHOLD(dbase);
+#endif
        }
+
        if (!ubeacon_AmSyncSite()) {
            DBRELE(dbase);
            return UNOTSYNC;
@@ -471,8 +637,12 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
        DBRELE(dbase);
        return code;
     }
-    if (readAny)
+    if (readAny) {
        tt->flags |= TRREADANY;
+       if (readAny > 1) {
+           tt->flags |= TRREADWRITE;
+       }
+    }
     /* label trans and dbase with new tid */
     tt->tid.epoch = ubik_epochTime;
     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
@@ -480,18 +650,14 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
 
     if (transMode == UBIK_WRITETRANS) {
        /* for a write trans, we have to keep track of the write tid counter too */
-#if defined(UBIK_PAUSE)
        dbase->writeTidCounter = tt->tid.counter;
-#else
-       dbase->writeTidCounter += 2;
-#endif /* UBIK_PAUSE */
 
        /* next try to start transaction on appropriate number of machines */
-       code = ContactQuorum(DISK_Begin, tt, 0);
+       code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
        if (code) {
            /* we must abort the operation */
            udisk_abort(tt);
-           ContactQuorum(DISK_Abort, tt, 0);   /* force aborts to the others */
+           ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
            udisk_end(tt);
            DBRELE(dbase);
            return code;
@@ -503,31 +669,60 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
     return 0;
 }
 
+/*!
+ * \see BeginTrans()
+ */
 int
-ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+ubik_BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
                struct ubik_trans **transPtr)
 {
     return BeginTrans(dbase, transMode, transPtr, 0);
 }
 
+/*!
+ * \see BeginTrans()
+ */
 int
-ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
+ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
                       struct ubik_trans **transPtr)
 {
     return BeginTrans(dbase, transMode, transPtr, 1);
 }
 
-/* this routine ends a read or write transaction by aborting it */
+/*!
+ * \see BeginTrans()
+ */
+int
+ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
+                            struct ubik_trans **transPtr)
+{
+    return BeginTrans(dbase, transMode, transPtr, 2);
+}
+
+/*!
+ * \brief This routine ends a read or write transaction by aborting it.
+ */
 int
-ubik_AbortTrans(register struct ubik_trans *transPtr)
+ubik_AbortTrans(struct ubik_trans *transPtr)
 {
-    register afs_int32 code;
+    afs_int32 code;
     afs_int32 code2;
-    register struct ubik_dbase *dbase;
+    struct ubik_dbase *dbase;
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
+
+    ObtainWriteLock(&dbase->cache_lock);
+
     DBHOLD(dbase);
     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+    ReleaseWriteLock(&dbase->cache_lock);
+
     /* see if we're still up-to-date */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
@@ -552,23 +747,44 @@ ubik_AbortTrans(register struct ubik_trans *transPtr)
     }
 
     /* now it is safe to try remote abort */
-    code = ContactQuorum(DISK_Abort, transPtr, 0);
+    code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
     code2 = udisk_abort(transPtr);
     udisk_end(transPtr);
     DBRELE(dbase);
     return (code ? code : code2);
 }
 
-/* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
+static void
+WritebackApplicationCache(struct ubik_dbase *dbase)
+{
+    int code = 0;
+    if (ubik_SyncWriterCacheProc) {
+       code = ubik_SyncWriterCacheProc();
+    }
+    if (code) {
+       /* we failed to sync the local cache, so just invalidate the cache;
+        * we'll try to read the cache in again on the next read */
+       memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
+    } else {
+       memcpy(&dbase->cachedVersion, &dbase->version,
+              sizeof(dbase->cachedVersion));
+    }
+}
+
+/*!
+ * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
+ * \return an error code.
+ */
 int
-ubik_EndTrans(register struct ubik_trans *transPtr)
+ubik_EndTrans(struct ubik_trans *transPtr)
 {
-    register afs_int32 code;
+    afs_int32 code;
     struct timeval tv;
     afs_int32 realStart;
-    register struct ubik_server *ts;
+    struct ubik_server *ts;
     afs_int32 now;
-    register struct ubik_dbase *dbase;
+    int cachelocked = 0;
+    struct ubik_dbase *dbase;
 
     if (transPtr->type == UBIK_WRITETRANS) {
        code = ubik_Flush(transPtr);
@@ -579,15 +795,27 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
     }
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
+
+    if (transPtr->type != UBIK_READTRANS) {
+       /* must hold cache_lock before DBHOLD'ing */
+       ObtainWriteLock(&dbase->cache_lock);
+       cachelocked = 1;
+    }
+
     DBHOLD(dbase);
-    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
 
     /* give up if no longer current */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOQUORUM;
+       code = UNOQUORUM;
+       goto error;
     }
 
     if (transPtr->type == UBIK_READTRANS) {    /* reads are easy */
@@ -596,20 +824,33 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            goto success;       /* update cachedVersion correctly */
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
 
     if (!ubeacon_AmSyncSite()) {       /* no longer sync site */
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOTSYNC;
+       code = UNOTSYNC;
+       goto error;
     }
 
     /* now it is safe to do commit */
     code = udisk_commit(transPtr);
-    if (code == 0)
-       code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
+    if (code == 0) {
+       /* db data has been committed locally; update the local cache so
+        * readers can get at it */
+       WritebackApplicationCache(dbase);
+
+       ReleaseWriteLock(&dbase->cache_lock);
+
+       code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
+
+    } else {
+       memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+       ReleaseWriteLock(&dbase->cache_lock);
+    }
+    cachelocked = 0;
     if (code) {
        /* failed to commit, so must return failure.  Try to clear locks first, just for fun
         * Note that we don't know if this transaction will eventually commit at this point.
@@ -617,10 +858,10 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
         * we lose.  If we contact a majority of sites, then we won't be here: contacting
         * a majority guarantees commit, since it guarantees that one dude will be a
         * member of the next quorum. */
-       ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
+       ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
     /* before we can start sending unlock messages, we must wait until all servers
      * that are possibly still functioning on the other side of a network partition
@@ -640,15 +881,23 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
        }
        for (ts = ubik_servers; ts; ts = ts->next) {
            if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
+
                /* this guy could have some damaged data, wait for him */
                code = 1;
                tv.tv_sec = 1;  /* try again after a while (ha ha) */
                tv.tv_usec = 0;
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+
+#ifdef AFS_PTHREAD_ENV
+               /* we could release the dbase outside of the loop, but we do
+                * it here, in the loop, to avoid an unnecessary RELE/HOLD
+                * if all sites are up */
+               DBRELE(dbase);
                select(0, 0, 0, 0, &tv);
+               DBHOLD(dbase);
 #else
                IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
 #endif
+
                break;
            }
        }
@@ -661,25 +910,39 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
      * The transaction is committed anyway, since we succeeded in contacting a quorum
      * at the start (when invoking the DiskCommit function).
      */
-    ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
+    ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
 
   success:
     udisk_end(transPtr);
-    /* update version on successful EndTrans */
-    memcpy(&dbase->cachedVersion, &dbase->version,
-          sizeof(struct ubik_version));
-
+    /* don't update cachedVersion here; it should have been updated way back
+     * in ubik_CheckCache, and earlier in this function for writes */
     DBRELE(dbase);
+    if (cachelocked) {
+       ReleaseWriteLock(&dbase->cache_lock);
+    }
     return 0;
-}
 
-/* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
+  error:
+    if (!cachelocked) {
+       ObtainWriteLock(&dbase->cache_lock);
+    }
+    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+    ReleaseWriteLock(&dbase->cache_lock);
+    return code;
+}
 
+/*!
+ * \brief This routine reads length bytes into buffer from the current position in the database.
+ *
+ * The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  A short read returns zero for an error code.
+ *
+ * \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
+ */
 int
-ubik_Read(register struct ubik_trans *transPtr, char *buffer,
+ubik_Read(struct ubik_trans *transPtr, void *buffer,
          afs_int32 length)
 {
-    register afs_int32 code;
+    afs_int32 code;
 
     /* reads are easy to do: handle locally */
     DBHOLD(transPtr->dbase);
@@ -698,9 +961,11 @@ ubik_Read(register struct ubik_trans *transPtr, char *buffer,
     return code;
 }
 
-/* This routine will flush the io data in the iovec structures. It first
- * flushes to the local disk and then uses ContactQuorum to write it to 
- * the other servers.
+/*!
+ * \brief This routine will flush the io data in the iovec structures.
+ *
+ * It first flushes to the local disk and then uses ContactQuorum to write it
+ * to the other servers.
  */
 int
 ubik_Flush(struct ubik_trans *transPtr)
@@ -721,11 +986,11 @@ ubik_Flush(struct ubik_trans *transPtr)
 
     /* Update the rest of the servers in the quorum */
     code =
-       ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
-                     &transPtr->iovec_data);
+       ContactQuorum_DISK_WriteV(transPtr, 0, &transPtr->iovec_info,
+                                 &transPtr->iovec_data);
     if (code) {
        udisk_abort(transPtr);
-       ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
+       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
        transPtr->iovec_info.iovec_wrt_len = 0;
        transPtr->iovec_data.iovec_buf_len = 0;
        ERROR_EXIT(code);
@@ -741,12 +1006,13 @@ ubik_Flush(struct ubik_trans *transPtr)
 }
 
 int
-ubik_Write(register struct ubik_trans *transPtr, char *buffer,
+ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
           afs_int32 length)
 {
     struct ubik_iovec *iovec;
     afs_int32 code, error = 0;
     afs_int32 pos, len, size;
+    char * buffer = (char *)vbuffer;
 
     if (transPtr->type != UBIK_WRITETRANS)
        return UBADTYPE;
@@ -756,7 +1022,7 @@ ubik_Write(register struct ubik_trans *transPtr, char *buffer,
     if (length > IOVEC_MAXBUF) {
        for (pos = 0, len = length; len > 0; len -= size, pos += size) {
            size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
-           code = ubik_Write(transPtr, &buffer[pos], size);
+           code = ubik_Write(transPtr, buffer+pos, size);
            if (code)
                return (code);
        }
@@ -826,13 +1092,18 @@ ubik_Write(register struct ubik_trans *transPtr, char *buffer,
     return error;
 }
 
-/* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
-
+/*!
+ * \brief This sets the file pointer associated with the current transaction
+ * to the appropriate file and byte position.
+ *
+ * Unlike Unix files, a transaction is labelled by both a file number \p fileid
+ * and a byte position relative to the specified file \p position.
+ */
 int
-ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
+ubik_Seek(struct ubik_trans *transPtr, afs_int32 fileid,
          afs_int32 position)
 {
-    register afs_int32 code;
+    afs_int32 code;
 
     DBHOLD(transPtr->dbase);
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
@@ -846,10 +1117,12 @@ ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
     return code;
 }
 
-/* This call returns the file pointer associated with the specified transaction in fileid and position. */
-
+/*!
+ * \brief This call returns the file pointer associated with the specified
+ * transaction in \p fileid and \p position.
+ */
 int
-ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
+ubik_Tell(struct ubik_trans *transPtr, afs_int32 * fileid,
          afs_int32 * position)
 {
     DBHOLD(transPtr->dbase);
@@ -859,10 +1132,12 @@ ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
     return 0;
 }
 
-/* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
-
+/*!
+ * \brief This sets the file size for the currently-selected file to \p length
+ * bytes, if length is less than the file's current size.
+ */
 int
-ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
+ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
 {
     afs_int32 code, error = 0;
 
@@ -882,13 +1157,13 @@ ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
     code = udisk_truncate(transPtr, transPtr->seekFile, length);
     if (!code) {
        code =
-           ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
-                         length);
+           ContactQuorum_DISK_Truncate(transPtr, 0, transPtr->seekFile,
+                                       length);
     }
     if (code) {
        /* we must abort the operation */
        udisk_abort(transPtr);
-       ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
+       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
        ERROR_EXIT(code);
     }
 
@@ -897,7 +1172,9 @@ ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
     return error;
 }
 
-/* set a lock; all locks are released on transaction end (commit/abort) */
+/*!
+ * \brief set a lock; all locks are released on transaction end (commit/abort)
+ */
 int
 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
             int atype)
@@ -927,13 +1204,13 @@ ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
        /* now do the operation locally, and propagate it out */
        code = ulock_getLock(atrans, atype, 1);
        if (code == 0) {
-           code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
-                                1 /*unused */ , LOCKWRITE);
+           code = ContactQuorum_DISK_Lock(atrans, 0, 0, 1 /*unused */ ,
+                                          1 /*unused */ , LOCKWRITE);
        }
        if (code) {
            /* we must abort the operation */
            udisk_abort(atrans);
-           ContactQuorum(DISK_Abort, atrans, 0);       /* force aborts to the others */
+           ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
            ERROR_EXIT(code);
        }
     }
@@ -943,61 +1220,151 @@ ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
     return error;
 }
 
-/* utility to wait for a version # to change */
+/*!
+ * \brief utility to wait for a version # to change
+ */
 int
-ubik_WaitVersion(register struct ubik_dbase *adatabase,
-                register struct ubik_version *aversion)
+ubik_WaitVersion(struct ubik_dbase *adatabase,
+                struct ubik_version *aversion)
 {
+    DBHOLD(adatabase);
     while (1) {
        /* wait until version # changes, and then return */
-       if (vcmp(*aversion, adatabase->version) != 0)
+       if (vcmp(*aversion, adatabase->version) != 0) {
+           DBRELE(adatabase);
            return 0;
-#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
-       assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
-       assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
-       assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
+       }
+#ifdef AFS_PTHREAD_ENV
+       CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
 #else
+       DBRELE(adatabase);
        LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
+       DBHOLD(adatabase);
 #endif
     }
 }
 
-/* utility to get the version of the dbase a transaction is dealing with */
+/*!
+ * \brief utility to get the version of the dbase a transaction is dealing with
+ */
 int
-ubik_GetVersion(register struct ubik_trans *atrans,
-               register struct ubik_version *avers)
+ubik_GetVersion(struct ubik_trans *atrans,
+               struct ubik_version *avers)
 {
     *avers = atrans->dbase->version;
     return 0;
 }
 
-/* Facility to simplify database caching.  Returns zero if last trans was done
-   on the local server and was successful.  If return value is non-zero and the
-   caller is a server caching part of the Ubik database, it should invalidate
-   that cache.  A return value of -1 means bad (NULL) argument. */
-
-int
-ubik_CacheUpdate(register struct ubik_trans *atrans)
+/*!
+ * \brief Facility to simplify database caching.
+ * \return zero if last trans was done on the local server and was successful.
+ * \return -1 means bad (NULL) argument.
+ *
+ * If return value is non-zero and the caller is a server caching part of the
+ * Ubik database, it should invalidate that cache.
+ */
+static int
+ubik_CacheUpdate(struct ubik_trans *atrans)
 {
     if (!(atrans && atrans->dbase))
        return -1;
     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
 }
 
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check  function to call to check/update cache
+ * @param[in] rock   rock to pass to *check
+ *
+ * @return operation status
+ *   @retval 0       success
+ *   @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ *       up-to-date
+ */
 int
-panic(char *a, char *b, char *c, char *d)
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
 {
+    int ret = 0;
+
+    if (!(atrans && atrans->dbase))
+       return -1;
+
+    ObtainReadLock(&atrans->dbase->cache_lock);
+
+    while (ubik_CacheUpdate(atrans) != 0) {
+
+       ReleaseReadLock(&atrans->dbase->cache_lock);
+       ObtainSharedLock(&atrans->dbase->cache_lock);
+
+       if (ubik_CacheUpdate(atrans) != 0) {
+
+           BoostSharedLock(&atrans->dbase->cache_lock);
+
+           ret = (*cbf) (atrans, rock);
+           if (ret == 0) {
+               memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+                      sizeof(atrans->dbase->cachedVersion));
+           }
+       }
+
+       /* It would be nice if we could convert from a shared lock to a read
+        * lock... instead, just release the shared and acquire the read */
+       ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+       if (ret) {
+           /* if we have an error, don't retry, and don't hold any locks */
+           return ret;
+       }
+
+       ObtainReadLock(&atrans->dbase->cache_lock);
+    }
+
+    atrans->flags |= TRCACHELOCKED;
+
+    return 0;
+}
+
+/*!
+ * "Who said anything about panicking?" snapped Arthur.
+ * "This is still just the culture shock. You wait till I've settled down
+ * into the situation and found my bearings. \em Then I'll start panicking!"
+ * --Authur Dent
+ *
+ * \returns There is no return from panic.
+ */
+void
+panic(char *format, ...)
+{
+    va_list ap;
+
+    va_start(ap, format);
     ubik_print("Ubik PANIC: ");
-    ubik_print(a, b, c, d);
+    ubik_vprint(format, ap);
+    va_end(ap);
+
     abort();
     ubik_print("BACK FROM ABORT\n");   /* shouldn't come back */
     exit(1);                   /* never know, though  */
 }
 
-/*
-** This functions takes an IP addresses as its parameter. It returns the
-** the primary IP address that is on the host passed in.
-*/
+/*!
+ * This function takes an IP addresses as its parameter. It returns the
+ * the primary IP address that is on the host passed in, or 0 if not found.
+ */
 afs_uint32
 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
 {
@@ -1010,3 +1377,26 @@ ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
                return ts->addr[0];     /* net byte order */
     return 0;                  /* if not in server database, return error */
 }
+
+int
+ubik_CheckAuth(struct rx_call *acall)
+{
+    if (checkSecurityProc)
+       return (*checkSecurityProc) (securityRock, acall);
+    else if (ubik_CheckRXSecurityProc) {
+       return (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
+    } else
+       return 0;
+}
+
+void
+ubik_SetServerSecurityProcs(void (*buildproc) (void *,
+                                              struct rx_securityClass ***,
+                                              afs_int32 *),
+                           int (*checkproc) (void *, struct rx_call *),
+                           void *rock)
+{
+    buildSecClassesProc = buildproc;
+    checkSecurityProc = checkproc;
+    securityRock = rock;
+}