Protect ubik cache accesses
authorAndrew Deason <adeason@sinenomine.net>
Wed, 31 Mar 2010 16:40:42 +0000 (11:40 -0500)
committerRuss Allbery <rra@stanford.edu>
Wed, 26 May 2010 22:00:51 +0000 (15:00 -0700)
Currently, ubik application cached data could be updated and read by
different threads simultaneously. Add a mechanism in ubik for
protecting accessing and updating the cached data. This adds the
function ubik_CheckCache to do this, and removes ubik_CacheUpdate as
an exported function (since it's not safe).

Update all callers to use the new mechanism. In ptserver, remove the
'initd' variable; just rely on cachedVersion and ubik_CheckCache to
tell us when to re-read the database. Remove db.lock in buserver and
cheader_lock in kaserver, which served similar (though not completely
threadsafe) protection as ubik_CheckCache. Add the ubik database lock
cache_lock to protect the application cache.

Change-Id: I857a67d410e2c539197c5212c3b114c3fd0403c2
Reviewed-on: http://gerrit.openafs.org/1546
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Russ Allbery <rra@stanford.edu>

src/budb/database.c
src/kauth/kadatabase.c
src/ptserver/ptprocs.c
src/ptserver/ptutils.c
src/ptserver/ubik.c
src/ubik/ubik.c
src/ubik/ubik.p.h
src/vlserver/vlutils.c

index 2445dd4..15c668f 100644 (file)
@@ -45,7 +45,6 @@ InitDB(void)
     pollCount = 0;
 
     memset(&db, 0, sizeof(db));
-    Lock_Init(&db.lock);
     if ((code = InitDBalloc()) || (code = InitDBhash()))
        return code;
     return 0;
@@ -167,17 +166,22 @@ cdbread(struct ubik_trans *ut, int type, afs_int32 pos, void *buff, afs_int32 le
 /* check that the database has been initialized.  Be careful to fail in a safe
    manner, to avoid bogusly reinitializing the db.  */
 
-afs_int32
-CheckInit(struct ubik_trans *ut, 
-         int (*db_init) (struct ubik_trans *ut)) /* call if rebuilding DB */
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[in] rock  opaque pointer to an int (*) (struct ubik_trans *), which
+ *                  will be called on rebuilding the database (or NULL to not
+ *                  rebuild the db)
+ *
+ * @return operation status
+ *   @retval 0 success
+ */
+static afs_int32
+UpdateCache(struct ubik_trans *ut, void *rock)
 {
-    register afs_int32 code;
-
-    /* Don't read header if not necessary */
-    if (!ubik_CacheUpdate(ut))
-       return 0;
-
-    ObtainWriteLock(&db.lock);
+    int (*db_init) (struct ubik_trans *ut) = rock;
+    afs_int32 code;
 
     db.h.eofPtr = htonl(sizeof(db.h)); /* for sanity check in dbread */
     code = dbread(ut, 0, (char *)&db.h, sizeof(db.h));
@@ -202,7 +206,6 @@ CheckInit(struct ubik_trans *ut,
     ht_Reset(&db.dumpIden);
 
   error_exit:
-    ReleaseWriteLock(&db.lock);
     if (code) {
        if ((code == UEOF) || (code == BUDB_EMPTY)) {
            if (db_init) {
@@ -234,3 +237,10 @@ CheckInit(struct ubik_trans *ut,
     }
     return code;
 }
+
+afs_int32
+CheckInit(struct ubik_trans *ut,
+         int (*db_init) (struct ubik_trans *ut)) /* call if rebuilding DB */
+{
+    return ubik_CheckCache(ut, UpdateCache, db_init);
+}
index 13e45a8..8471260 100644 (file)
@@ -77,7 +77,6 @@ karead(struct ubik_trans *tt, afs_int32 pos, char *buff, afs_int32 len)
     return code;
 }
 
-static struct Lock cheader_lock;
 static struct Lock keycache_lock;
 
 static int maxCachedKeys;
@@ -98,7 +97,6 @@ static int dbfixup = 0;
 void
 init_kadatabase(int initFlags)
 {
-    Lock_Init(&cheader_lock);
     Lock_Init(&keycache_lock);
 
     maxCachedKeys = 10;
@@ -116,20 +114,25 @@ init_kadatabase(int initFlags)
 
 /* check that the database has been initialized.  Be careful to fail in a safe
    manner, to avoid bogusly reinitializing the db.  */
-
-afs_int32
-CheckInit(struct ubik_trans *at,
-          int (*db_init) (struct ubik_trans *))                /* procedure to call if rebuilding DB */
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[in] rock  opaque pointer to an int (*) (struct ubik_trans *), which
+ *                  will be called on rebuilding the database (or NULL to not
+ *                  rebuild the db)
+ *
+ * @return operation status
+ *   @retval 0 success
+ */
+static afs_int32
+UpdateCache(struct ubik_trans *at, void *rock)
 {
-    register afs_int32 code;
+    int (*db_init) (struct ubik_trans *) = rock;
+    afs_int32 code;
     afs_int32 iversion;
     afs_int32 tversion;
 
-    /* Don't read header if not necessary */
-    if (!ubik_CacheUpdate(at))
-       return 0;
-
-    ObtainWriteLock(&cheader_lock);
     if ((code = karead(at, 0, (char *)&iversion, sizeof(iversion)))
        || (code =
            karead(at, sizeof(cheader) - sizeof(afs_int32), (char *)&tversion,
@@ -155,7 +158,6 @@ CheckInit(struct ubik_trans *at,
            code = KAIO;
        }
     }
-    ReleaseWriteLock(&cheader_lock);
     if (code == 0)
        return 0;
 
@@ -190,6 +192,13 @@ CheckInit(struct ubik_trans *at,
     return db_init(at);                /* initialize the db */
 }
 
+afs_int32
+CheckInit(struct ubik_trans *at,
+          int (*db_init) (struct ubik_trans *))                /* procedure to call if rebuilding DB */
+{
+    return ubik_CheckCache(at, UpdateCache, db_init);
+}
+
 /* Allocate a free block of storage for entry, returning address of a new
    zeroed entry.  If zero is returned, a Ubik I/O error can be assumed.  */
 
@@ -628,7 +637,8 @@ ka_debugKeyCache(struct ka_debugInfo *info)
 {
     int i;
 
-    memcpy(&info->cheader_lock, &cheader_lock, sizeof(info->cheader_lock));
+    /* cheader_lock no longer exists */
+    memset(&info->cheader_lock, 0, sizeof(info->cheader_lock));
     memcpy(&info->keycache_lock, &keycache_lock, sizeof(info->keycache_lock));
 
     info->kcVersion = keyCacheVersion;
index e67da23..1cd803f 100644 (file)
@@ -83,7 +83,6 @@
 extern int restricted;
 extern struct ubik_dbase *dbase;
 extern int pr_noAuth;
-extern afs_int32 initd;
 extern char *pr_realmName;
 extern int prp_group_default;
 extern int prp_user_default;
@@ -146,10 +145,11 @@ static afs_int32 WhoIsThisWithName(struct rx_call *acall,
                                   struct ubik_trans *at, afs_int32 *aid, 
                                   char *aname);
 
-/* When abort, reset initd so that the header is read in on next call.
+/* when we abort, the ubik cachedVersion will be reset, so we'll read in the
+ * header on the next call.
  * Abort the transaction and return the code.
  */
-#define ABORT_WITH(tt,code) return(initd=0,ubik_AbortTrans(tt),code)
+#define ABORT_WITH(tt,code) return(ubik_AbortTrans(tt),code)
 
 static int
 CreateOK(struct ubik_trans *ut, afs_int32 cid, afs_int32 oid, afs_int32 flag, 
index 5401792..c4315c2 100644 (file)
@@ -1694,14 +1694,11 @@ SetMax(struct ubik_trans *at, afs_int32 id, afs_int32 flag)
     return PRSUCCESS;
 }
 
-afs_int32
-read_DbHeader(struct ubik_trans *tt)
+static afs_int32
+UpdateCache(struct ubik_trans *tt, void *rock)
 {
     afs_int32 code;
 
-    if (!ubik_CacheUpdate(tt))
-       return 0;
-
     code = pr_Read(tt, 0, 0, (char *)&cheader, sizeof(cheader));
     if (code != 0) {
        afs_com_err(whoami, code, "Couldn't read header");
@@ -1709,15 +1706,57 @@ read_DbHeader(struct ubik_trans *tt)
     return code;
 }
 
+afs_int32
+read_DbHeader(struct ubik_trans *tt)
+{
+    return ubik_CheckCache(tt, UpdateCache, NULL);
+}
+
 int pr_noAuth;
-afs_int32 initd = 0;
+
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[out] rock  opaque pointer to an int*, which on success will be set
+ *                   to 1 if we need to build the database, or 0 if we do not
+ *
+ * @return operation status
+ *   @retval 0 success
+ */
+static afs_int32
+Initdb_check(struct ubik_trans *tt, void *rock)
+{
+    int *build_rock = rock;
+    afs_int32 code;
+    afs_int32 len;
+
+    len = sizeof(cheader);
+    code = pr_Read(tt, 0, 0, (char *)&cheader, len);
+    if (code != 0) {
+       afs_com_err(whoami, code, "couldn't read header");
+       return code;
+    }
+    if ((ntohl(cheader.version) == PRDBVERSION)
+       && ntohl(cheader.headerSize) == sizeof(cheader)
+       && ntohl(cheader.eofPtr) != 0
+       && FindByID(tt, ANONYMOUSID) != 0) {
+       /* database exists, so we don't have to build it */
+       *build_rock = 0;
+       return 0;
+    }
+
+    /* else we need to build a database */
+    *build_rock = 1;
+    return 0;
+}
 
 afs_int32
 Initdb(void)
 {
-    afs_int32 code;
     struct ubik_trans *tt;
-    afs_int32 len;
+    int build = 0;
+    afs_int32 code;
 
     /* init the database.  We'll try reading it, but if we're starting
      * from scratch, we'll have to do a write transaction. */
@@ -1732,52 +1771,40 @@ Initdb(void)
        ubik_AbortTrans(tt);
        return code;
     }
-    if (!initd) {
-       initd = 1;
-    } else if (!ubik_CacheUpdate(tt)) {
-       code = ubik_EndTrans(tt);
-       return code;
-    }
 
-    len = sizeof(cheader);
-    code = pr_Read(tt, 0, 0, (char *)&cheader, len);
-    if (code != 0) {
-       afs_com_err(whoami, code, "couldn't read header");
+    code = ubik_CheckCache(tt, Initdb_check, &build);
+    if (code) {
        ubik_AbortTrans(tt);
        return code;
     }
-    if ((ntohl(cheader.version) == PRDBVERSION)
-       && ntohl(cheader.headerSize) == sizeof(cheader)
-       && ntohl(cheader.eofPtr) != 0
-       && FindByID(tt, ANONYMOUSID) != 0) {
-       /* database exists, so we don't have to build it */
-       code = ubik_EndTrans(tt);
-       if (code)
-           return code;
-       return PRSUCCESS;
-    }
-    /* else we need to build a database */
-    code = ubik_EndTrans(tt);
-    if (code)
-       return code;
 
-    /* Only rebuild database if the db was deleted (the header is zero) and we
-     * are running noAuth. */
-    {
+    if (build) {
+       /* Only rebuild database if the db was deleted (the header is zero) and we
+        * are running noAuth. */
        char *bp = (char *)&cheader;
        int i;
-       for (i = 0; i < sizeof(cheader); i++)
+       for (i = 0; i < sizeof(cheader); i++) {
            if (bp[i]) {
                code = PRDBBAD;
                afs_com_err(whoami, code,
                        "Can't rebuild database because it is not empty");
-               return code;
+               break;
            }
+       }
+       if (!pr_noAuth) {
+           code = PRDBBAD;
+           afs_com_err(whoami, code,
+                       "Can't rebuild database because not running NoAuth");
+       }
     }
-    if (!pr_noAuth) {
-       code = PRDBBAD;
-       afs_com_err(whoami, code,
-               "Can't rebuild database because not running NoAuth");
+
+    if (code) {
+       ubik_EndTrans(tt);
+    } else {
+       code = ubik_EndTrans(tt);
+    }
+    if (code || !build) {
+       /* either we encountered an error, or we don't need to build the db */
        return code;
     }
 
@@ -1796,6 +1823,12 @@ Initdb(void)
      * actually have been a good database out there.  Now that we have a
      * real write transaction, make sure things are still bad.
      */
+    code = pr_Read(tt, 0, 0, (char *)&cheader, sizeof(cheader));
+    if (code != 0) {
+       afs_com_err(whoami, code, "couldn't read header");
+       ubik_AbortTrans(tt);
+       return code;
+    }
     if ((ntohl(cheader.version) == PRDBVERSION)
        && ntohl(cheader.headerSize) == sizeof(cheader)
        && ntohl(cheader.eofPtr) != 0
index b5cdf61..2124e6c 100644 (file)
@@ -99,7 +99,7 @@ ubik_WaitVersion(register struct ubik_dbase *adatabase,
 }
 
 int
-ubik_CacheUpdate(register struct ubik_trans *atrans)
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
 {
     return (0);
 }
index 9bb9a97..b86a17b 100644 (file)
@@ -420,6 +420,7 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
 #else
     Lock_Init(&tdb->versionLock);
 #endif
+    Lock_Init(&tdb->cache_lock);
     tdb->flags = 0;
     tdb->read = uphys_read;
     tdb->write = uphys_write;
@@ -749,8 +750,19 @@ ubik_AbortTrans(register struct ubik_trans *transPtr)
     register struct ubik_dbase *dbase;
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
+
+    ObtainWriteLock(&dbase->cache_lock);
+
     DBHOLD(dbase);
     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+    ReleaseWriteLock(&dbase->cache_lock);
+
     /* see if we're still up-to-date */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
@@ -805,15 +817,20 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
     }
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
     DBHOLD(dbase);
-    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
 
     /* give up if no longer current */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOQUORUM;
+       code = UNOQUORUM;
+       goto error;
     }
 
     if (transPtr->type == UBIK_READTRANS) {    /* reads are easy */
@@ -822,14 +839,15 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            goto success;       /* update cachedVersion correctly */
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
 
     if (!ubeacon_AmSyncSite()) {       /* no longer sync site */
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOTSYNC;
+       code = UNOTSYNC;
+       goto error;
     }
 
     /* now it is safe to do commit */
@@ -846,7 +864,7 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
        ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
     /* before we can start sending unlock messages, we must wait until all servers
      * that are possibly still functioning on the other side of a network partition
@@ -882,6 +900,12 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            break;              /* no down ones still pseudo-active */
     }
 
+    /* the commit bumped the dbase version, and since the write was local
+     * our cache should still be up to date, so make sure to update
+     * cachedVersion, too */
+    memcpy(&dbase->cachedVersion, &dbase->version,
+           sizeof(dbase->cachedVersion));
+
     /* finally, unlock all the dudes.  We can return success independent of the number of servers
      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
      * The transaction is committed anyway, since we succeeded in contacting a quorum
@@ -891,12 +915,16 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
 
   success:
     udisk_end(transPtr);
-    /* update version on successful EndTrans */
-    memcpy(&dbase->cachedVersion, &dbase->version,
-          sizeof(struct ubik_version));
-
+    /* don't update cachedVersion here; it should have been updated way back
+     * in ubik_CheckCache, and earlier in this function for writes */
     DBRELE(dbase);
     return 0;
+
+  error:
+    ObtainWriteLock(&dbase->cache_lock);
+    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+    ReleaseWriteLock(&dbase->cache_lock);
+    return code;
 }
 
 /*!
@@ -1231,7 +1259,7 @@ ubik_GetVersion(register struct ubik_trans *atrans,
  * If return value is non-zero and the caller is a server caching part of the 
  * Ubik database, it should invalidate that cache.
  */
-int
+static int
 ubik_CacheUpdate(register struct ubik_trans *atrans)
 {
     if (!(atrans && atrans->dbase))
@@ -1239,6 +1267,73 @@ ubik_CacheUpdate(register struct ubik_trans *atrans)
     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
 }
 
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check  function to call to check/update cache
+ * @param[in] rock   rock to pass to *check
+ *
+ * @return operation status
+ *   @retval 0       success
+ *   @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ *       up-to-date
+ */
+int
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
+{
+    int ret = 0;
+
+    if (!(atrans && atrans->dbase))
+       return -1;
+
+    ObtainReadLock(&atrans->dbase->cache_lock);
+
+    while (ubik_CacheUpdate(atrans) != 0) {
+
+       ReleaseReadLock(&atrans->dbase->cache_lock);
+       ObtainSharedLock(&atrans->dbase->cache_lock);
+
+       if (ubik_CacheUpdate(atrans) != 0) {
+
+           BoostSharedLock(&atrans->dbase->cache_lock);
+
+           ret = (*cbf) (atrans, rock);
+           if (ret == 0) {
+               memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+                      sizeof(atrans->dbase->cachedVersion));
+           }
+       }
+
+       /* It would be nice if we could convert from a shared lock to a read
+        * lock... instead, just release the shared and acquire the read */
+       ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+       if (ret) {
+           /* if we have an error, don't retry, and don't hold any locks */
+           return ret;
+       }
+
+       ObtainReadLock(&atrans->dbase->cache_lock);
+    }
+
+    atrans->flags |= TRCACHELOCKED;
+
+    return 0;
+}
+
 /*!
  * "Who said anything about panicking?" snapped Arthur. 
  * "This is still just the culture shock. You wait till I've settled down
index 5f9f48f..3fe8b15 100644 (file)
@@ -190,12 +190,28 @@ struct ubik_dbase {
     int (*getnfiles) (struct ubik_dbase * adbase);     /*!< find out number of files */
     short readers;             /*!< number of current read transactions */
     struct ubik_version cachedVersion; /*!< version of caller's cached data */
+#ifdef UKERNEL
+    struct afs_lock cache_lock;
+#else
+    struct Lock cache_lock; /*!< protects cached application data */
+#endif
 #ifdef AFS_PTHREAD_ENV
     pthread_cond_t version_cond;    /*!< condition variable to manage changes to version */
     pthread_cond_t flags_cond;      /*!< condition variable to manage changes to flags */
 #endif
 };
 
+/**
+ * ubik_CheckCache callback function.
+ *
+ * @param[in] atrans  ubik transaction
+ * @param[in] rock    rock passed to ubik_CheckCache
+ *
+ * @return operation status
+ *   @retval 0        cache was read properly
+ */
+typedef int (*ubik_updatecache_func) (struct ubik_trans *atrans, void *rock);
+
 /*! \name procedures for automatically authenticating ubik connections */
 extern int (*ubik_CRXSecurityProc) (void *, struct rx_securityClass **,
                                    afs_int32 *);
@@ -232,6 +248,9 @@ extern void *ubik_CheckRXSecurityRock;
 #define TRSETLOCK           8  /*!< SetLock is using trans */
 #define TRSTALE             16 /*!< udisk_end during getLock */
 #endif /* UBIK_PAUSE */
+#define TRCACHELOCKED       32  /*!< this trans has locked dbase->cache_lock
+                                 *   (meaning, this trans has called
+                                 *   ubik_CheckCache at some point */
 /*\}*/
 
 /*! \name ubik_lock flags */
@@ -494,7 +513,9 @@ extern int ubik_WaitVersion(register struct ubik_dbase *adatabase,
                            register struct ubik_version *aversion);
 extern int ubik_GetVersion(register struct ubik_trans *atrans,
                           register struct ubik_version *avers);
-extern int ubik_CacheUpdate(register struct ubik_trans *atrans);
+extern int ubik_CheckCache(struct ubik_trans *atrans,
+                           ubik_updatecache_func check,
+                           void *rock);
 /*\}*/
 
 /*! \name ubikclient.c */
index 1d572f0..cd9ff5c 100644 (file)
@@ -290,37 +290,38 @@ readExtents(struct ubik_trans *trans)
 
 /* Check that the database has been initialized.  Be careful to fail in a safe
    manner, to avoid bogusly reinitializing the db.  */
-afs_int32
-CheckInit(struct ubik_trans *trans, int builddb)
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[in] rock  opaque pointer to an int*; if 1, we should rebuild the db
+ *                  if it appears empty, if 0 we should return an error if the
+ *                  db appears empty
+ *
+ * @return operation status
+ *   @retval 0 success
+ */
+static afs_int32
+UpdateCache(struct ubik_trans *trans, void *rock)
 {
-    afs_int32 error = 0, i, code, ubcode = 0;
-
-    /* ubik_CacheUpdate must be called on every transaction.  It returns 0 if the
-     * previous transaction would have left the cache fine, and non-zero otherwise.
-     * Thus, a local abort or a remote commit will cause this to return non-zero
-     * and force a header re-read.  Necessary for a local abort because we may
-     * have damaged cheader during the operation.  Necessary for a remote commit
-     * since it may have changed cheader. 
-     */
-    if (ubik_CacheUpdate(trans) != 0) {
-       /* if version changed (or first call), read the header */
-       ubcode = vlread(trans, 0, (char *)&cheader, sizeof(cheader));
-       vldbversion = ntohl(cheader.vital_header.vldbversion);
-
-       if (!ubcode && (vldbversion != 0)) {
-           memcpy(HostAddress, cheader.IpMappedAddr,
-                  sizeof(cheader.IpMappedAddr));
-           for (i = 0; i < MAXSERVERID + 1; i++) {     /* cvt HostAddress to host order */
-               HostAddress[i] = ntohl(HostAddress[i]);
-           }
+    int *builddb_rock = rock;
+    int builddb = *builddb_rock;
+    afs_int32 error = 0, i, code, ubcode;
 
-           code = readExtents(trans);
-           if (code)
-               ERROR_EXIT(code);
+    /* if version changed (or first call), read the header */
+    ubcode = vlread(trans, 0, (char *)&cheader, sizeof(cheader));
+    vldbversion = ntohl(cheader.vital_header.vldbversion);
+
+    if (!ubcode && (vldbversion != 0)) {
+       memcpy(HostAddress, cheader.IpMappedAddr, sizeof(cheader.IpMappedAddr));
+       for (i = 0; i < MAXSERVERID + 1; i++) { /* cvt HostAddress to host order */
+           HostAddress[i] = ntohl(HostAddress[i]);
        }
-    }
 
-    vldbversion = ntohl(cheader.vital_header.vldbversion);
+       code = readExtents(trans);
+       if (code)
+           ERROR_EXIT(code);
+    }
 
     /* now, if can't read, or header is wrong, write a new header */
     if (ubcode || vldbversion == 0) {
@@ -343,14 +344,18 @@ CheckInit(struct ubik_trans *trans, int builddb)
                printf("Can't write VLDB header (error = %d)\n", code);
                ERROR_EXIT(VL_IO);
            }
-       } else
+           vldbversion = ntohl(cheader.vital_header.vldbversion);
+       } else {
            ERROR_EXIT(VL_EMPTY);
-    } else if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION)
-              && (vldbversion != VLDBVERSION_4)) {
+       }
+    }
+
+    if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION)
+        && (vldbversion != VLDBVERSION_4)) {
        printf
            ("VLDB version %d doesn't match this software version(%d, %d or %d), quitting!\n",
             vldbversion, VLDBVERSION_4, VLDBVERSION, OVLDBVERSION);
-       ERROR_EXIT(VL_BADVERSION);
+       return VL_BADVERSION;
     }
 
     maxnservers = ((vldbversion == 3 || vldbversion == 4) ? 13 : 8);
@@ -360,6 +365,30 @@ CheckInit(struct ubik_trans *trans, int builddb)
     return error;
 }
 
+afs_int32
+CheckInit(struct ubik_trans *trans, int builddb)
+{
+    afs_int32 code;
+
+    code = ubik_CheckCache(trans, UpdateCache, &builddb);
+    if (code) {
+       return code;
+    }
+
+    /* these next two cases shouldn't happen (UpdateCache should either
+     * rebuild the db or return an error if these cases occur), but just to
+     * be on the safe side... */
+    if (vldbversion == 0) {
+       return VL_EMPTY;
+    }
+    if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION)
+        && (vldbversion != VLDBVERSION_4)) {
+       return VL_BADVERSION;
+    }
+
+    return 0;
+}
+
 
 afs_int32
 GetExtentBlock(register struct ubik_trans *trans, register afs_int32 base)