ubik: refactor pthread creation code
[openafs.git] / src / ubik / ubik.c
index 4bcfe49..f10d34e 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Copyright 2000, International Business Machines Corporation and others.
  * All Rights Reserved.
- * 
+ *
  * This software has been released under the terms of the IBM Public
  * License.  For details, see the LICENSE file in the top-level source
  * directory or online at http://www.openafs.org/dl/license10.html
@@ -10,6 +10,7 @@
 #include <afsconfig.h>
 #include <afs/param.h>
 
+#include <roken.h>
 
 #include <sys/types.h>
 #include <string.h>
 
 #include <lwp.h>   /* temporary hack by klm */
 
-#define ERROR_EXIT(code) {error=(code); goto error_exit;}
+#define ERROR_EXIT(code) do { \
+    error = (code); \
+    goto error_exit; \
+} while (0)
 
 /*!
  * \file
  * This system is organized in a hierarchical set of related modules.  Modules
  * at one level can only call modules at the same level or below.
- * 
+ *
  * At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
  * operating system primitives.
  *
  * \li DISK--The module responsible for representing atomic transactions
  * on the local disk.  It maintains a new-value only log.
  * \li LOCK--The module responsible for locking byte ranges in the database file.
- *     
+ *
  * At the next level (2) we have
- *  
+ *
  * \li RECOVERY--The module responsible for ensuring that all members of a quorum
  * have the same up-to-date database after a new synchronization site is
  * elected.  This module runs only on the synchronization site.
- *     
+ *
  * At the next level (3) we have
  *
  * \li REMOTE--The module responsible for interpreting requests from the sync
@@ -83,298 +87,274 @@ afs_int32 ubik_epochTime = 0;
 afs_int32 urecovery_state = 0;
 int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
 void *ubik_SRXSecurityRock;
+int (*ubik_SyncWriterCacheProc) (void);
 struct ubik_server *ubik_servers;
 short ubik_callPortal;
 
-static int BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+static int BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
                      struct ubik_trans **transPtr, int readAny);
 
 struct rx_securityClass *ubik_sc[3];
 
 #define        CStampVersion       1   /* meaning set ts->version */
 
-/*! 
+static_inline struct rx_connection *
+Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
+{
+    struct rx_connection *conn;
+
+    conn = as->disk_rxcid;
+
+#ifdef AFS_PTHREAD_ENV
+    rx_GetConnection(conn);
+    DBRELE(atrans->dbase);
+#endif /* AFS_PTHREAD_ENV */
+
+    return conn;
+}
+
+static_inline void
+Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
+{
+#ifdef AFS_PTHREAD_ENV
+    DBHOLD(atrans->dbase);
+    rx_PutConnection(aconn);
+#endif /* AFS_PTHREAD_ENV */
+}
+
+
+/*
+ * Iterate over all servers.  Callers pass in *ts which is used to track
+ * the current server.
+ * - Returns 1 if there are no more servers
+ * - Returns 0 with conn set to the connection for the current server if
+ *   it's up and current
+ */
+static int
+ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
+                        struct rx_connection **conn, afs_int32 *rcode,
+                        afs_int32 *okcalls, afs_int32 code)
+{
+    if (!*ts) {
+       /* Initial call - start iterating over servers */
+       *ts = ubik_servers;
+       *conn = NULL;
+       *rcode = 0;
+       *okcalls = 0;
+    } else {
+       if (*conn) {
+           Quorum_EndIO(atrans, *conn);
+           *conn = NULL;
+           if (code) {         /* failure */
+               *rcode = code;
+               (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
+               (*ts)->beaconSinceDown = 0;
+               (*ts)->currentDB = 0;
+               urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
+           } else {            /* success */
+               if (!(*ts)->isClone)
+                   (*okcalls)++;       /* count up how many worked */
+               if (aflags & CStampVersion) {
+                   (*ts)->version = atrans->dbase->version;
+               }
+           }
+       }
+       *ts = (*ts)->next;
+    }
+    if (!(*ts))
+       return 1;
+    if (!(*ts)->up || !(*ts)->currentDB) {
+       (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
+       return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
+    }
+    *conn = Quorum_StartIO(atrans, *ts);
+    return 0;
+}
+
+static int
+ContactQuorum_rcode(int okcalls, afs_int32 rcode)
+{
+    /*
+     * return 0 if we successfully contacted a quorum, otherwise return error code.
+     * We don't have to contact ourselves (that was done locally)
+     */
+    if (okcalls + 1 >= ubik_quorum)
+       return 0;
+    else
+       return rcode;
+}
+
+/*!
  * \brief Perform an operation at a quorum, handling error conditions.
  * \return 0 if all worked and a quorum was contacted successfully
  * \return otherwise mark failing server as down and return #UERROR
  *
  * \note If any server misses an update, we must wait #BIGTIME seconds before
- * allowing the transaction to commit, to ensure that the missing and 
- * possibly still functioning server times out and stops handing out old 
- * data.  This is done in the commit code, where we wait for a server marked 
- * down to have stayed down for #BIGTIME seconds before we allow a transaction 
- * to commit.  A server that fails but comes back up won't give out old data 
+ * allowing the transaction to commit, to ensure that the missing and
+ * possibly still functioning server times out and stops handing out old
+ * data.  This is done in the commit code, where we wait for a server marked
+ * down to have stayed down for #BIGTIME seconds before we allow a transaction
+ * to commit.  A server that fails but comes back up won't give out old data
  * because it is sent the sync count along with the beacon message that
  * marks it as \b really up (\p beaconSinceDown).
  */
 afs_int32
 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
-                         register struct ubik_trans *atrans, int aflags)
+                         struct ubik_trans *atrans, int aflags)
 {
-    register struct ubik_server *ts;
-    register afs_int32 code;
-    afs_int32 rcode, okcalls;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-       code = (*proc)(ts->disk_rxcid, &atrans->tid);
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = (*proc)(conn, &atrans->tid);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
-ContactQuorum_DISK_Lock(register struct ubik_trans *atrans, int aflags,afs_int32 file,
+ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
                        afs_int32 position, afs_int32 length, afs_int32 type)
 {
-    register struct ubik_server *ts;
-    register afs_int32 code;
-    afs_int32 rcode, okcalls;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-       code = DISK_Lock(ts->disk_rxcid, &atrans->tid, file, position, length,
-                          type);
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
-ContactQuorum_DISK_Write(register struct ubik_trans *atrans, int aflags,
+ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
                         afs_int32 file, afs_int32 position, bulkdata *data)
 {
-    register struct ubik_server *ts;
-    register afs_int32 code;
-    afs_int32 rcode, okcalls;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-       code = DISK_Write(ts->disk_rxcid, &atrans->tid, file, position, data);
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Write(conn, &atrans->tid, file, position, data);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
-ContactQuorum_DISK_Truncate(register struct ubik_trans *atrans, int aflags,
+ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
                            afs_int32 file, afs_int32 length)
 {
-    register struct ubik_server *ts;
-    register afs_int32 code;
-    afs_int32 rcode, okcalls;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-       code = DISK_Truncate(ts->disk_rxcid, &atrans->tid, file, length);
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Truncate(conn, &atrans->tid, file, length);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
-ContactQuorum_DISK_WriteV(register struct ubik_trans *atrans, int aflags,
+ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                          iovec_wrt * io_vector, iovec_buf *io_buffer)
 {
-    register struct ubik_server *ts;
-    register afs_int32 code;
-    afs_int32 rcode, okcalls;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       code = DISK_WriteV(ts->disk_rxcid, &atrans->tid, io_vector, io_buffer);
-
-       if ((code <= -450) && (code > -500)) {
-           /* An RPC interface mismatch (as defined in comerr/error_msg.c).
-            * Un-bulk the entries and do individual DISK_Write calls
-            * instead of DISK_WriteV.
-            */
-           struct ubik_iovec *iovec =
-               (struct ubik_iovec *)io_vector->iovec_wrt_val;
-           char *iobuf = (char *)io_buffer->iovec_buf_val;
-           bulkdata tcbs;
-           afs_int32 i, offset;
-
-           for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
-               /* Sanity check for going off end of buffer */
-               if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
-                   code = UINTERNAL;
-                   break;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn) {
+           code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+           if ((code <= -450) && (code > -500)) {
+               /* An RPC interface mismatch (as defined in comerr/error_msg.c).
+                * Un-bulk the entries and do individual DISK_Write calls
+                * instead of DISK_WriteV.
+                */
+               struct ubik_iovec *iovec =
+                       (struct ubik_iovec *)io_vector->iovec_wrt_val;
+               char *iobuf = (char *)io_buffer->iovec_buf_val;
+               bulkdata tcbs;
+               afs_int32 i, offset;
+
+               for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
+                   /* Sanity check for going off end of buffer */
+                   if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
+                       code = UINTERNAL;
+                       break;
+                   }
+                   tcbs.bulkdata_len = iovec[i].length;
+                   tcbs.bulkdata_val = &iobuf[offset];
+                   code = DISK_Write(conn, &atrans->tid, iovec[i].file,
+                          iovec[i].position, &tcbs);
+                   if (code)
+                       break;
+                   offset += iovec[i].length;
                }
-               tcbs.bulkdata_len = iovec[i].length;
-               tcbs.bulkdata_val = &iobuf[offset];
-               code =
-                   DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
-                              iovec[i].position, &tcbs);
-               if (code)
-                   break;
-
-               offset += iovec[i].length;
-           }
-       }
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
            }
        }
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
-ContactQuorum_DISK_SetVersion(register struct ubik_trans *atrans, int aflags, 
+ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
                              ubik_version *OldVersion,
                              ubik_version *NewVersion)
 {
-    register struct ubik_server *ts;
-    register afs_int32 code;
-    afs_int32 rcode, okcalls;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-       code = DISK_SetVersion(ts->disk_rxcid, &atrans->tid, OldVersion, 
-                              NewVersion);
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
+    struct rx_connection *conn;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
-/*! 
+#if defined(AFS_PTHREAD_ENV)
+static int
+ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
+    osi_Assert(pthread_attr_init(tattr) == 0);
+    osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
+    osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
+    return 0;
+}
+#endif
+
+/*!
  * \brief This routine initializes the ubik system for a set of servers.
  * \return 0 for success, or an error code on failure.
  * \param serverList set of servers specified; nServers gives the number of entries in this array.
- * \param pathName provides an initial prefix used for naming storage files used by this system.  
- * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.  
+ * \param pathName provides an initial prefix used for naming storage files used by this system.
+ * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.
  *
  * \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
  *
@@ -383,13 +363,13 @@ ContactQuorum_DISK_SetVersion(register struct ubik_trans *atrans, int aflags,
  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
  */
 int
-ubik_ServerInitCommon(afs_int32 myHost, short myPort,
+ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
                      struct afsconf_cell *info, char clones[],
-                     afs_int32 serverList[], const char *pathName,
+                     afs_uint32 serverList[], const char *pathName,
                      struct ubik_dbase **dbase)
 {
-    register struct ubik_dbase *tdb;
-    register afs_int32 code;
+    struct ubik_dbase *tdb;
+    afs_int32 code;
 #ifdef AFS_PTHREAD_ENV
     pthread_t rxServerThread;        /* pthread variables */
     pthread_t ubeacon_InteractThread;
@@ -415,7 +395,12 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     tdb->activeTrans = (struct ubik_trans *)0;
     memset(&tdb->version, 0, sizeof(struct ubik_version));
     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
+#ifdef AFS_PTHREAD_ENV
+    MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
+#else
     Lock_Init(&tdb->versionLock);
+#endif
+    Lock_Init(&tdb->cache_lock);
     tdb->flags = 0;
     tdb->read = uphys_read;
     tdb->write = uphys_write;
@@ -431,6 +416,11 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     *dbase = tdb;
     ubik_dbase = tdb;          /* for now, only one db per server; can fix later when we have names for the other dbases */
 
+#ifdef AFS_PTHREAD_ENV
+    CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
+    CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
+#endif /* AFS_PTHREAD_ENV */
+
     /* initialize RX */
 
     /* the following call is idempotent so when/if it got called earlier,
@@ -439,6 +429,22 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     if (code < 0)
        return code;
 
+    udisk_Init(ubik_nBuffers);
+    ulock_Init();
+
+    code = uvote_Init();
+    if (code)
+       return code;
+    code = urecovery_Initialize(tdb);
+    if (code)
+       return code;
+    if (info)
+       code = ubeacon_InitServerListByInfo(myHost, info, clones);
+    else
+       code = ubeacon_InitServerList(myHost, serverList);
+    if (code)
+       return code;
+
     ubik_callPortal = myPort;
     /* try to get an additional security object */
     ubik_sc[0] = rxnull_NewServerSecurityObject();
@@ -452,19 +458,8 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
            ubik_sc[secIndex] = secClass;
        }
     }
-    /* for backwards compat this should keep working as it does now 
+    /* for backwards compat this should keep working as it does now
        and not host bind */
-#if 0
-    /* This really needs to be up above, where I have put it.  It works
-     * here when we're non-pthreaded, but the code above, when using
-     * pthreads may (and almost certainly does) end up calling on a
-     * pthread resource which gets initialized by rx_Init.  The end
-     * result is that an assert fails and the program dies. -- klm
-     */
-    code = rx_Init(myPort);
-    if (code < 0)
-       return code;
-#endif
 
     tservice =
        rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
@@ -486,46 +481,21 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     rx_SetMinProcs(tservice, 2);
     rx_SetMaxProcs(tservice, 3);
 
-    /* start an rx_ServerProc to handle incoming RPC's in particular the 
+    /* start an rx_ServerProc to handle incoming RPC's in particular the
      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
      * the "steplock" problem in ubik initialization. Defect 11037.
      */
 #ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
-    assert(pthread_attr_init(&rxServer_tattr) == 0);
-    assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
-
-    assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
+    ubik_thread_create(&rxServer_tattr, &rxServerThread, (void *)rx_ServerProc);
 #else
     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
               NULL, "rx_ServerProc", &junk);
 #endif
 
-    /* do basic initialization */
-    code = uvote_Init();
-    if (code)
-       return code;
-    code = urecovery_Initialize(tdb);
-    if (code)
-       return code;
-    if (info)
-       code = ubeacon_InitServerListByInfo(myHost, info, clones);
-    else
-       code = ubeacon_InitServerList(myHost, serverList);
-    if (code)
-       return code;
-
     /* now start up async processes */
 #ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
-    assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
-    assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
-    /*  need another attr set here for priority???  - klm */
-
-    assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
-           (void *)ubeacon_Interact, NULL) == 0);
+    ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
+               (void *)ubeacon_Interact);
 #else
     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
                             LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
@@ -535,17 +505,10 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
 #endif
 
 #ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
-    assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
-    assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
-    /*  need another attr set here for priority???  - klm */
-
-    assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
-           (void *)urecovery_Interact, NULL) == 0);
-
+    ubik_thread_create(&urecovery_Interact_tattr, &urecovery_InteractThread,
+               (void *)urecovery_Interact);
     return 0;  /* is this correct?  - klm */
-#else  
+#else
     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
                             LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
                             &junk);
@@ -558,7 +521,7 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
  * \see ubik_ServerInitCommon()
  */
 int
-ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
+ubik_ServerInitByInfo(afs_uint32 myHost, short myPort,
                      struct afsconf_cell *info, char clones[],
                      const char *pathName, struct ubik_dbase **dbase)
 {
@@ -574,7 +537,7 @@ ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
  * \see ubik_ServerInitCommon()
  */
 int
-ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
+ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
                const char *pathName, struct ubik_dbase **dbase)
 {
     afs_int32 code;
@@ -589,59 +552,36 @@ ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
  * \brief This routine begins a read or write transaction on the transaction
  * identified by transPtr, in the dbase named by dbase.
  *
- * An open mode of ubik_READTRANS identifies this as a read transaction, 
+ * An open mode of ubik_READTRANS identifies this as a read transaction,
  * while a mode of ubik_WRITETRANS identifies this as a write transaction.
- * transPtr is set to the returned transaction control block. 
- * The readAny flag is set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
- * ubik_BeginTransReadAny() below.
+ * transPtr is set to the returned transaction control block.
+ * The readAny flag is set to 0 or 1 or 2 by the wrapper functions
+ * ubik_BeginTrans() or ubik_BeginTransReadAny() or
+ * ubik_BeginTransReadAnyWrite() below.
  *
  * \note We can only begin transaction when we have an up-to-date database.
  */
 static int
-BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
           struct ubik_trans **transPtr, int readAny)
 {
     struct ubik_trans *jt;
-    register struct ubik_trans *tt;
-    register afs_int32 code;
-#if defined(UBIK_PAUSE)
-    int count;
-#endif /* UBIK_PAUSE */
+    struct ubik_trans *tt;
+    afs_int32 code;
+
+    if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
+       /* it's not safe to use ubik_BeginTransReadAnyWrite without a
+        * cache-syncing function; fall back to ubik_BeginTransReadAny,
+        * which is safe but slower */
+       ubik_print("ubik_BeginTransReadAnyWrite called, but "
+                  "ubik_SyncWriterCacheProc not set; pretending "
+                  "ubik_BeginTransReadAny was called instead\n");
+       readAny = 1;
+    }
 
     if ((transMode != UBIK_READTRANS) && readAny)
        return UBADTYPE;
     DBHOLD(dbase);
-#if defined(UBIK_PAUSE)
-    /* if we're polling the slave sites, wait until the returns
-     *  are all in.  Otherwise, the urecovery_CheckTid call may
-     *  glitch us. 
-     */
-    if (transMode == UBIK_WRITETRANS)
-       for (count = 75; dbase->flags & DBVOTING; --count) {
-           DBRELE(dbase);
-#ifdef GRAND_PAUSE_DEBUGGING
-           if (count == 75)
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-           else
-#endif
-           if (count <= 0) {
-#if 1
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-#endif
-               return UNOQUORUM;       /* a white lie */
-           }
-#ifdef AFS_PTHREAD_ENV
-           sleep(2);
-#else
-           IOMGR_Sleep(2);
-#endif
-           DBHOLD(dbase);
-       }
-#endif /* UBIK_PAUSE */
     if (urecovery_AllBetter(dbase, readAny) == 0) {
        DBRELE(dbase);
        return UNOQUORUM;
@@ -655,16 +595,15 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
     if (transMode == UBIK_WRITETRANS) {
        /* if we're writing already, wait */
        while (dbase->flags & DBWRITING) {
-           DBRELE(dbase);
 #ifdef AFS_PTHREAD_ENV
-           assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
-           assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
-           assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
+           CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
 #else
+           DBRELE(dbase);
            LWP_WaitProcess(&dbase->flags);
-#endif
            DBHOLD(dbase);
+#endif
        }
+
        if (!ubeacon_AmSyncSite()) {
            DBRELE(dbase);
            return UNOTSYNC;
@@ -678,8 +617,12 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
        DBRELE(dbase);
        return code;
     }
-    if (readAny)
+    if (readAny) {
        tt->flags |= TRREADANY;
+       if (readAny > 1) {
+           tt->flags |= TRREADWRITE;
+       }
+    }
     /* label trans and dbase with new tid */
     tt->tid.epoch = ubik_epochTime;
     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
@@ -687,11 +630,7 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
 
     if (transMode == UBIK_WRITETRANS) {
        /* for a write trans, we have to keep track of the write tid counter too */
-#if defined(UBIK_PAUSE)
        dbase->writeTidCounter = tt->tid.counter;
-#else
-       dbase->writeTidCounter += 2;
-#endif /* UBIK_PAUSE */
 
        /* next try to start transaction on appropriate number of machines */
        code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
@@ -714,7 +653,7 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
  * \see BeginTrans()
  */
 int
-ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+ubik_BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
                struct ubik_trans **transPtr)
 {
     return BeginTrans(dbase, transMode, transPtr, 0);
@@ -724,25 +663,46 @@ ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
  * \see BeginTrans()
  */
 int
-ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
+ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
                       struct ubik_trans **transPtr)
 {
     return BeginTrans(dbase, transMode, transPtr, 1);
 }
 
 /*!
+ * \see BeginTrans()
+ */
+int
+ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
+                            struct ubik_trans **transPtr)
+{
+    return BeginTrans(dbase, transMode, transPtr, 2);
+}
+
+/*!
  * \brief This routine ends a read or write transaction by aborting it.
  */
 int
-ubik_AbortTrans(register struct ubik_trans *transPtr)
+ubik_AbortTrans(struct ubik_trans *transPtr)
 {
-    register afs_int32 code;
+    afs_int32 code;
     afs_int32 code2;
-    register struct ubik_dbase *dbase;
+    struct ubik_dbase *dbase;
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
+
+    ObtainWriteLock(&dbase->cache_lock);
+
     DBHOLD(dbase);
     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+    ReleaseWriteLock(&dbase->cache_lock);
+
     /* see if we're still up-to-date */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
@@ -774,19 +734,37 @@ ubik_AbortTrans(register struct ubik_trans *transPtr)
     return (code ? code : code2);
 }
 
+static void
+WritebackApplicationCache(struct ubik_dbase *dbase)
+{
+    int code = 0;
+    if (ubik_SyncWriterCacheProc) {
+       code = ubik_SyncWriterCacheProc();
+    }
+    if (code) {
+       /* we failed to sync the local cache, so just invalidate the cache;
+        * we'll try to read the cache in again on the next read */
+       memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
+    } else {
+       memcpy(&dbase->cachedVersion, &dbase->version,
+              sizeof(dbase->cachedVersion));
+    }
+}
+
 /*!
  * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
  * \return an error code.
  */
 int
-ubik_EndTrans(register struct ubik_trans *transPtr)
+ubik_EndTrans(struct ubik_trans *transPtr)
 {
-    register afs_int32 code;
+    afs_int32 code;
     struct timeval tv;
     afs_int32 realStart;
-    register struct ubik_server *ts;
+    struct ubik_server *ts;
     afs_int32 now;
-    register struct ubik_dbase *dbase;
+    int cachelocked = 0;
+    struct ubik_dbase *dbase;
 
     if (transPtr->type == UBIK_WRITETRANS) {
        code = ubik_Flush(transPtr);
@@ -797,15 +775,27 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
     }
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
+
+    if (transPtr->type != UBIK_READTRANS) {
+       /* must hold cache_lock before DBHOLD'ing */
+       ObtainWriteLock(&dbase->cache_lock);
+       cachelocked = 1;
+    }
+
     DBHOLD(dbase);
-    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
 
     /* give up if no longer current */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOQUORUM;
+       code = UNOQUORUM;
+       goto error;
     }
 
     if (transPtr->type == UBIK_READTRANS) {    /* reads are easy */
@@ -814,20 +804,33 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            goto success;       /* update cachedVersion correctly */
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
 
     if (!ubeacon_AmSyncSite()) {       /* no longer sync site */
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOTSYNC;
+       code = UNOTSYNC;
+       goto error;
     }
 
     /* now it is safe to do commit */
     code = udisk_commit(transPtr);
-    if (code == 0)
+    if (code == 0) {
+       /* db data has been committed locally; update the local cache so
+        * readers can get at it */
+       WritebackApplicationCache(dbase);
+
+       ReleaseWriteLock(&dbase->cache_lock);
+
        code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
+
+    } else {
+       memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+       ReleaseWriteLock(&dbase->cache_lock);
+    }
+    cachelocked = 0;
     if (code) {
        /* failed to commit, so must return failure.  Try to clear locks first, just for fun
         * Note that we don't know if this transaction will eventually commit at this point.
@@ -838,7 +841,7 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
        ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
     /* before we can start sending unlock messages, we must wait until all servers
      * that are possibly still functioning on the other side of a network partition
@@ -858,15 +861,23 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
        }
        for (ts = ubik_servers; ts; ts = ts->next) {
            if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
+
                /* this guy could have some damaged data, wait for him */
                code = 1;
                tv.tv_sec = 1;  /* try again after a while (ha ha) */
                tv.tv_usec = 0;
+
 #ifdef AFS_PTHREAD_ENV
+               /* we could release the dbase outside of the loop, but we do
+                * it here, in the loop, to avoid an unnecessary RELE/HOLD
+                * if all sites are up */
+               DBRELE(dbase);
                select(0, 0, 0, 0, &tv);
+               DBHOLD(dbase);
 #else
                IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
 #endif
+
                break;
            }
        }
@@ -883,26 +894,35 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
 
   success:
     udisk_end(transPtr);
-    /* update version on successful EndTrans */
-    memcpy(&dbase->cachedVersion, &dbase->version,
-          sizeof(struct ubik_version));
-
+    /* don't update cachedVersion here; it should have been updated way back
+     * in ubik_CheckCache, and earlier in this function for writes */
     DBRELE(dbase);
+    if (cachelocked) {
+       ReleaseWriteLock(&dbase->cache_lock);
+    }
     return 0;
+
+  error:
+    if (!cachelocked) {
+       ObtainWriteLock(&dbase->cache_lock);
+    }
+    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+    ReleaseWriteLock(&dbase->cache_lock);
+    return code;
 }
 
 /*!
  * \brief This routine reads length bytes into buffer from the current position in the database.
- * 
+ *
  * The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  A short read returns zero for an error code.
  *
  * \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
  */
 int
-ubik_Read(register struct ubik_trans *transPtr, void *buffer,
+ubik_Read(struct ubik_trans *transPtr, void *buffer,
          afs_int32 length)
 {
-    register afs_int32 code;
+    afs_int32 code;
 
     /* reads are easy to do: handle locally */
     DBHOLD(transPtr->dbase);
@@ -922,7 +942,7 @@ ubik_Read(register struct ubik_trans *transPtr, void *buffer,
 }
 
 /*!
- * \brief This routine will flush the io data in the iovec structures. 
+ * \brief This routine will flush the io data in the iovec structures.
  *
  * It first flushes to the local disk and then uses ContactQuorum to write it
  * to the other servers.
@@ -966,7 +986,7 @@ ubik_Flush(struct ubik_trans *transPtr)
 }
 
 int
-ubik_Write(register struct ubik_trans *transPtr, void *vbuffer,
+ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
           afs_int32 length)
 {
     struct ubik_iovec *iovec;
@@ -1060,10 +1080,10 @@ ubik_Write(register struct ubik_trans *transPtr, void *vbuffer,
  * and a byte position relative to the specified file \p position.
  */
 int
-ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
+ubik_Seek(struct ubik_trans *transPtr, afs_int32 fileid,
          afs_int32 position)
 {
-    register afs_int32 code;
+    afs_int32 code;
 
     DBHOLD(transPtr->dbase);
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
@@ -1082,7 +1102,7 @@ ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
  * transaction in \p fileid and \p position.
  */
 int
-ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
+ubik_Tell(struct ubik_trans *transPtr, afs_int32 * fileid,
          afs_int32 * position)
 {
     DBHOLD(transPtr->dbase);
@@ -1097,7 +1117,7 @@ ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
  * bytes, if length is less than the file's current size.
  */
 int
-ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
+ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
 {
     afs_int32 code, error = 0;
 
@@ -1184,19 +1204,22 @@ ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
  * \brief utility to wait for a version # to change
  */
 int
-ubik_WaitVersion(register struct ubik_dbase *adatabase,
-                register struct ubik_version *aversion)
+ubik_WaitVersion(struct ubik_dbase *adatabase,
+                struct ubik_version *aversion)
 {
+    DBHOLD(adatabase);
     while (1) {
        /* wait until version # changes, and then return */
-       if (vcmp(*aversion, adatabase->version) != 0)
+       if (vcmp(*aversion, adatabase->version) != 0) {
+           DBRELE(adatabase);
            return 0;
+       }
 #ifdef AFS_PTHREAD_ENV
-       assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
-       assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
-       assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
+       CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
 #else
+       DBRELE(adatabase);
        LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
+       DBHOLD(adatabase);
 #endif
     }
 }
@@ -1205,31 +1228,98 @@ ubik_WaitVersion(register struct ubik_dbase *adatabase,
  * \brief utility to get the version of the dbase a transaction is dealing with
  */
 int
-ubik_GetVersion(register struct ubik_trans *atrans,
-               register struct ubik_version *avers)
+ubik_GetVersion(struct ubik_trans *atrans,
+               struct ubik_version *avers)
 {
     *avers = atrans->dbase->version;
     return 0;
 }
 
 /*!
- * \brief Facility to simplify database caching.  
+ * \brief Facility to simplify database caching.
  * \return zero if last trans was done on the local server and was successful.
  * \return -1 means bad (NULL) argument.
- * 
- * If return value is non-zero and the caller is a server caching part of the 
+ *
+ * If return value is non-zero and the caller is a server caching part of the
  * Ubik database, it should invalidate that cache.
  */
-int
-ubik_CacheUpdate(register struct ubik_trans *atrans)
+static int
+ubik_CacheUpdate(struct ubik_trans *atrans)
 {
     if (!(atrans && atrans->dbase))
        return -1;
     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
 }
 
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check  function to call to check/update cache
+ * @param[in] rock   rock to pass to *check
+ *
+ * @return operation status
+ *   @retval 0       success
+ *   @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ *       up-to-date
+ */
+int
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
+{
+    int ret = 0;
+
+    if (!(atrans && atrans->dbase))
+       return -1;
+
+    ObtainReadLock(&atrans->dbase->cache_lock);
+
+    while (ubik_CacheUpdate(atrans) != 0) {
+
+       ReleaseReadLock(&atrans->dbase->cache_lock);
+       ObtainSharedLock(&atrans->dbase->cache_lock);
+
+       if (ubik_CacheUpdate(atrans) != 0) {
+
+           BoostSharedLock(&atrans->dbase->cache_lock);
+
+           ret = (*cbf) (atrans, rock);
+           if (ret == 0) {
+               memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+                      sizeof(atrans->dbase->cachedVersion));
+           }
+       }
+
+       /* It would be nice if we could convert from a shared lock to a read
+        * lock... instead, just release the shared and acquire the read */
+       ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+       if (ret) {
+           /* if we have an error, don't retry, and don't hold any locks */
+           return ret;
+       }
+
+       ObtainReadLock(&atrans->dbase->cache_lock);
+    }
+
+    atrans->flags |= TRCACHELOCKED;
+
+    return 0;
+}
+
 /*!
- * "Who said anything about panicking?" snapped Arthur. 
+ * "Who said anything about panicking?" snapped Arthur.
  * "This is still just the culture shock. You wait till I've settled down
  * into the situation and found my bearings. \em Then I'll start panicking!"
  * --Authur Dent