From dc8f18d6f5003712bc9ef989363137a84953df07 Mon Sep 17 00:00:00 2001 From: Andrew Deason Date: Wed, 31 Mar 2010 11:40:42 -0500 Subject: [PATCH] Protect ubik cache accesses Currently, ubik application cached data could be updated and read by different threads simultaneously. Add a mechanism in ubik for protecting accessing and updating the cached data. This adds the function ubik_CheckCache to do this, and removes ubik_CacheUpdate as an exported function (since it's not safe). Update all callers to use the new mechanism. In ptserver, remove the 'initd' variable; just rely on cachedVersion and ubik_CheckCache to tell us when to re-read the database. Remove db.lock in buserver and cheader_lock in kaserver, which served similar (though not completely threadsafe) protection as ubik_CheckCache. Add the ubik database lock cache_lock to protect the application cache. Change-Id: I857a67d410e2c539197c5212c3b114c3fd0403c2 Reviewed-on: http://gerrit.openafs.org/1546 Reviewed-by: Derrick Brashear Tested-by: Russ Allbery --- src/budb/database.c | 34 +++++++++------ src/kauth/kadatabase.c | 38 ++++++++++------ src/ptserver/ptprocs.c | 6 +-- src/ptserver/ptutils.c | 115 +++++++++++++++++++++++++++++++------------------ src/ptserver/ubik.c | 2 +- src/ubik/ubik.c | 115 ++++++++++++++++++++++++++++++++++++++++++++----- src/ubik/ubik.p.h | 23 +++++++++- src/vlserver/vlutils.c | 91 +++++++++++++++++++++++++------------- 8 files changed, 311 insertions(+), 113 deletions(-) diff --git a/src/budb/database.c b/src/budb/database.c index 2445dd4..15c668f 100644 --- a/src/budb/database.c +++ b/src/budb/database.c @@ -45,7 +45,6 @@ InitDB(void) pollCount = 0; memset(&db, 0, sizeof(db)); - Lock_Init(&db.lock); if ((code = InitDBalloc()) || (code = InitDBhash())) return code; return 0; @@ -167,17 +166,22 @@ cdbread(struct ubik_trans *ut, int type, afs_int32 pos, void *buff, afs_int32 le /* check that the database has been initialized. Be careful to fail in a safe manner, to avoid bogusly reinitializing the db. */ -afs_int32 -CheckInit(struct ubik_trans *ut, - int (*db_init) (struct ubik_trans *ut)) /* call if rebuilding DB */ +/** + * reads in db cache from ubik. + * + * @param[in] ut ubik transaction + * @param[in] rock opaque pointer to an int (*) (struct ubik_trans *), which + * will be called on rebuilding the database (or NULL to not + * rebuild the db) + * + * @return operation status + * @retval 0 success + */ +static afs_int32 +UpdateCache(struct ubik_trans *ut, void *rock) { - register afs_int32 code; - - /* Don't read header if not necessary */ - if (!ubik_CacheUpdate(ut)) - return 0; - - ObtainWriteLock(&db.lock); + int (*db_init) (struct ubik_trans *ut) = rock; + afs_int32 code; db.h.eofPtr = htonl(sizeof(db.h)); /* for sanity check in dbread */ code = dbread(ut, 0, (char *)&db.h, sizeof(db.h)); @@ -202,7 +206,6 @@ CheckInit(struct ubik_trans *ut, ht_Reset(&db.dumpIden); error_exit: - ReleaseWriteLock(&db.lock); if (code) { if ((code == UEOF) || (code == BUDB_EMPTY)) { if (db_init) { @@ -234,3 +237,10 @@ CheckInit(struct ubik_trans *ut, } return code; } + +afs_int32 +CheckInit(struct ubik_trans *ut, + int (*db_init) (struct ubik_trans *ut)) /* call if rebuilding DB */ +{ + return ubik_CheckCache(ut, UpdateCache, db_init); +} diff --git a/src/kauth/kadatabase.c b/src/kauth/kadatabase.c index 13e45a8..8471260 100644 --- a/src/kauth/kadatabase.c +++ b/src/kauth/kadatabase.c @@ -77,7 +77,6 @@ karead(struct ubik_trans *tt, afs_int32 pos, char *buff, afs_int32 len) return code; } -static struct Lock cheader_lock; static struct Lock keycache_lock; static int maxCachedKeys; @@ -98,7 +97,6 @@ static int dbfixup = 0; void init_kadatabase(int initFlags) { - Lock_Init(&cheader_lock); Lock_Init(&keycache_lock); maxCachedKeys = 10; @@ -116,20 +114,25 @@ init_kadatabase(int initFlags) /* check that the database has been initialized. Be careful to fail in a safe manner, to avoid bogusly reinitializing the db. */ - -afs_int32 -CheckInit(struct ubik_trans *at, - int (*db_init) (struct ubik_trans *)) /* procedure to call if rebuilding DB */ +/** + * reads in db cache from ubik. + * + * @param[in] ut ubik transaction + * @param[in] rock opaque pointer to an int (*) (struct ubik_trans *), which + * will be called on rebuilding the database (or NULL to not + * rebuild the db) + * + * @return operation status + * @retval 0 success + */ +static afs_int32 +UpdateCache(struct ubik_trans *at, void *rock) { - register afs_int32 code; + int (*db_init) (struct ubik_trans *) = rock; + afs_int32 code; afs_int32 iversion; afs_int32 tversion; - /* Don't read header if not necessary */ - if (!ubik_CacheUpdate(at)) - return 0; - - ObtainWriteLock(&cheader_lock); if ((code = karead(at, 0, (char *)&iversion, sizeof(iversion))) || (code = karead(at, sizeof(cheader) - sizeof(afs_int32), (char *)&tversion, @@ -155,7 +158,6 @@ CheckInit(struct ubik_trans *at, code = KAIO; } } - ReleaseWriteLock(&cheader_lock); if (code == 0) return 0; @@ -190,6 +192,13 @@ CheckInit(struct ubik_trans *at, return db_init(at); /* initialize the db */ } +afs_int32 +CheckInit(struct ubik_trans *at, + int (*db_init) (struct ubik_trans *)) /* procedure to call if rebuilding DB */ +{ + return ubik_CheckCache(at, UpdateCache, db_init); +} + /* Allocate a free block of storage for entry, returning address of a new zeroed entry. If zero is returned, a Ubik I/O error can be assumed. */ @@ -628,7 +637,8 @@ ka_debugKeyCache(struct ka_debugInfo *info) { int i; - memcpy(&info->cheader_lock, &cheader_lock, sizeof(info->cheader_lock)); + /* cheader_lock no longer exists */ + memset(&info->cheader_lock, 0, sizeof(info->cheader_lock)); memcpy(&info->keycache_lock, &keycache_lock, sizeof(info->keycache_lock)); info->kcVersion = keyCacheVersion; diff --git a/src/ptserver/ptprocs.c b/src/ptserver/ptprocs.c index e67da23..1cd803f 100644 --- a/src/ptserver/ptprocs.c +++ b/src/ptserver/ptprocs.c @@ -83,7 +83,6 @@ extern int restricted; extern struct ubik_dbase *dbase; extern int pr_noAuth; -extern afs_int32 initd; extern char *pr_realmName; extern int prp_group_default; extern int prp_user_default; @@ -146,10 +145,11 @@ static afs_int32 WhoIsThisWithName(struct rx_call *acall, struct ubik_trans *at, afs_int32 *aid, char *aname); -/* When abort, reset initd so that the header is read in on next call. +/* when we abort, the ubik cachedVersion will be reset, so we'll read in the + * header on the next call. * Abort the transaction and return the code. */ -#define ABORT_WITH(tt,code) return(initd=0,ubik_AbortTrans(tt),code) +#define ABORT_WITH(tt,code) return(ubik_AbortTrans(tt),code) static int CreateOK(struct ubik_trans *ut, afs_int32 cid, afs_int32 oid, afs_int32 flag, diff --git a/src/ptserver/ptutils.c b/src/ptserver/ptutils.c index 5401792..c4315c2 100644 --- a/src/ptserver/ptutils.c +++ b/src/ptserver/ptutils.c @@ -1694,14 +1694,11 @@ SetMax(struct ubik_trans *at, afs_int32 id, afs_int32 flag) return PRSUCCESS; } -afs_int32 -read_DbHeader(struct ubik_trans *tt) +static afs_int32 +UpdateCache(struct ubik_trans *tt, void *rock) { afs_int32 code; - if (!ubik_CacheUpdate(tt)) - return 0; - code = pr_Read(tt, 0, 0, (char *)&cheader, sizeof(cheader)); if (code != 0) { afs_com_err(whoami, code, "Couldn't read header"); @@ -1709,15 +1706,57 @@ read_DbHeader(struct ubik_trans *tt) return code; } +afs_int32 +read_DbHeader(struct ubik_trans *tt) +{ + return ubik_CheckCache(tt, UpdateCache, NULL); +} + int pr_noAuth; -afs_int32 initd = 0; + +/** + * reads in db cache from ubik. + * + * @param[in] ut ubik transaction + * @param[out] rock opaque pointer to an int*, which on success will be set + * to 1 if we need to build the database, or 0 if we do not + * + * @return operation status + * @retval 0 success + */ +static afs_int32 +Initdb_check(struct ubik_trans *tt, void *rock) +{ + int *build_rock = rock; + afs_int32 code; + afs_int32 len; + + len = sizeof(cheader); + code = pr_Read(tt, 0, 0, (char *)&cheader, len); + if (code != 0) { + afs_com_err(whoami, code, "couldn't read header"); + return code; + } + if ((ntohl(cheader.version) == PRDBVERSION) + && ntohl(cheader.headerSize) == sizeof(cheader) + && ntohl(cheader.eofPtr) != 0 + && FindByID(tt, ANONYMOUSID) != 0) { + /* database exists, so we don't have to build it */ + *build_rock = 0; + return 0; + } + + /* else we need to build a database */ + *build_rock = 1; + return 0; +} afs_int32 Initdb(void) { - afs_int32 code; struct ubik_trans *tt; - afs_int32 len; + int build = 0; + afs_int32 code; /* init the database. We'll try reading it, but if we're starting * from scratch, we'll have to do a write transaction. */ @@ -1732,52 +1771,40 @@ Initdb(void) ubik_AbortTrans(tt); return code; } - if (!initd) { - initd = 1; - } else if (!ubik_CacheUpdate(tt)) { - code = ubik_EndTrans(tt); - return code; - } - len = sizeof(cheader); - code = pr_Read(tt, 0, 0, (char *)&cheader, len); - if (code != 0) { - afs_com_err(whoami, code, "couldn't read header"); + code = ubik_CheckCache(tt, Initdb_check, &build); + if (code) { ubik_AbortTrans(tt); return code; } - if ((ntohl(cheader.version) == PRDBVERSION) - && ntohl(cheader.headerSize) == sizeof(cheader) - && ntohl(cheader.eofPtr) != 0 - && FindByID(tt, ANONYMOUSID) != 0) { - /* database exists, so we don't have to build it */ - code = ubik_EndTrans(tt); - if (code) - return code; - return PRSUCCESS; - } - /* else we need to build a database */ - code = ubik_EndTrans(tt); - if (code) - return code; - /* Only rebuild database if the db was deleted (the header is zero) and we - * are running noAuth. */ - { + if (build) { + /* Only rebuild database if the db was deleted (the header is zero) and we + * are running noAuth. */ char *bp = (char *)&cheader; int i; - for (i = 0; i < sizeof(cheader); i++) + for (i = 0; i < sizeof(cheader); i++) { if (bp[i]) { code = PRDBBAD; afs_com_err(whoami, code, "Can't rebuild database because it is not empty"); - return code; + break; } + } + if (!pr_noAuth) { + code = PRDBBAD; + afs_com_err(whoami, code, + "Can't rebuild database because not running NoAuth"); + } } - if (!pr_noAuth) { - code = PRDBBAD; - afs_com_err(whoami, code, - "Can't rebuild database because not running NoAuth"); + + if (code) { + ubik_EndTrans(tt); + } else { + code = ubik_EndTrans(tt); + } + if (code || !build) { + /* either we encountered an error, or we don't need to build the db */ return code; } @@ -1796,6 +1823,12 @@ Initdb(void) * actually have been a good database out there. Now that we have a * real write transaction, make sure things are still bad. */ + code = pr_Read(tt, 0, 0, (char *)&cheader, sizeof(cheader)); + if (code != 0) { + afs_com_err(whoami, code, "couldn't read header"); + ubik_AbortTrans(tt); + return code; + } if ((ntohl(cheader.version) == PRDBVERSION) && ntohl(cheader.headerSize) == sizeof(cheader) && ntohl(cheader.eofPtr) != 0 diff --git a/src/ptserver/ubik.c b/src/ptserver/ubik.c index b5cdf61..2124e6c 100644 --- a/src/ptserver/ubik.c +++ b/src/ptserver/ubik.c @@ -99,7 +99,7 @@ ubik_WaitVersion(register struct ubik_dbase *adatabase, } int -ubik_CacheUpdate(register struct ubik_trans *atrans) +ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock) { return (0); } diff --git a/src/ubik/ubik.c b/src/ubik/ubik.c index 9bb9a97..b86a17b 100644 --- a/src/ubik/ubik.c +++ b/src/ubik/ubik.c @@ -420,6 +420,7 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort, #else Lock_Init(&tdb->versionLock); #endif + Lock_Init(&tdb->cache_lock); tdb->flags = 0; tdb->read = uphys_read; tdb->write = uphys_write; @@ -749,8 +750,19 @@ ubik_AbortTrans(register struct ubik_trans *transPtr) register struct ubik_dbase *dbase; dbase = transPtr->dbase; + + if (transPtr->flags & TRCACHELOCKED) { + ReleaseReadLock(&dbase->cache_lock); + transPtr->flags &= ~TRCACHELOCKED; + } + + ObtainWriteLock(&dbase->cache_lock); + DBHOLD(dbase); memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version)); + + ReleaseWriteLock(&dbase->cache_lock); + /* see if we're still up-to-date */ if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) { udisk_abort(transPtr); @@ -805,15 +817,20 @@ ubik_EndTrans(register struct ubik_trans *transPtr) } dbase = transPtr->dbase; + + if (transPtr->flags & TRCACHELOCKED) { + ReleaseReadLock(&dbase->cache_lock); + transPtr->flags &= ~TRCACHELOCKED; + } DBHOLD(dbase); - memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version)); /* give up if no longer current */ if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) { udisk_abort(transPtr); udisk_end(transPtr); DBRELE(dbase); - return UNOQUORUM; + code = UNOQUORUM; + goto error; } if (transPtr->type == UBIK_READTRANS) { /* reads are easy */ @@ -822,14 +839,15 @@ ubik_EndTrans(register struct ubik_trans *transPtr) goto success; /* update cachedVersion correctly */ udisk_end(transPtr); DBRELE(dbase); - return code; + goto error; } if (!ubeacon_AmSyncSite()) { /* no longer sync site */ udisk_abort(transPtr); udisk_end(transPtr); DBRELE(dbase); - return UNOTSYNC; + code = UNOTSYNC; + goto error; } /* now it is safe to do commit */ @@ -846,7 +864,7 @@ ubik_EndTrans(register struct ubik_trans *transPtr) ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0); udisk_end(transPtr); DBRELE(dbase); - return code; + goto error; } /* before we can start sending unlock messages, we must wait until all servers * that are possibly still functioning on the other side of a network partition @@ -882,6 +900,12 @@ ubik_EndTrans(register struct ubik_trans *transPtr) break; /* no down ones still pseudo-active */ } + /* the commit bumped the dbase version, and since the write was local + * our cache should still be up to date, so make sure to update + * cachedVersion, too */ + memcpy(&dbase->cachedVersion, &dbase->version, + sizeof(dbase->cachedVersion)); + /* finally, unlock all the dudes. We can return success independent of the number of servers * that really unlock the dbase; the others will do it if/when they elect a new sync site. * The transaction is committed anyway, since we succeeded in contacting a quorum @@ -891,12 +915,16 @@ ubik_EndTrans(register struct ubik_trans *transPtr) success: udisk_end(transPtr); - /* update version on successful EndTrans */ - memcpy(&dbase->cachedVersion, &dbase->version, - sizeof(struct ubik_version)); - + /* don't update cachedVersion here; it should have been updated way back + * in ubik_CheckCache, and earlier in this function for writes */ DBRELE(dbase); return 0; + + error: + ObtainWriteLock(&dbase->cache_lock); + memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version)); + ReleaseWriteLock(&dbase->cache_lock); + return code; } /*! @@ -1231,7 +1259,7 @@ ubik_GetVersion(register struct ubik_trans *atrans, * If return value is non-zero and the caller is a server caching part of the * Ubik database, it should invalidate that cache. */ -int +static int ubik_CacheUpdate(register struct ubik_trans *atrans) { if (!(atrans && atrans->dbase)) @@ -1239,6 +1267,73 @@ ubik_CacheUpdate(register struct ubik_trans *atrans) return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0; } +/** + * check and possibly update cache of ubik db. + * + * If the version of the cached db data is out of date, this calls (*check) to + * update the cache. If (*check) returns success, we update the version of the + * cached db data. + * + * Checking the version of the cached db data is done under a read lock; + * updating the cache (and thus calling (*check)) is done under a write lock + * so is guaranteed not to interfere with another thread's (*check). On + * successful return, a read lock on the cached db data is obtained, which + * will be released by ubik_EndTrans or ubik_AbortTrans. + * + * @param[in] atrans ubik transaction + * @param[in] check function to call to check/update cache + * @param[in] rock rock to pass to *check + * + * @return operation status + * @retval 0 success + * @retval nonzero error; cachedVersion not updated + * + * @post On success, application cache is read-locked, and cache data is + * up-to-date + */ +int +ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock) +{ + int ret = 0; + + if (!(atrans && atrans->dbase)) + return -1; + + ObtainReadLock(&atrans->dbase->cache_lock); + + while (ubik_CacheUpdate(atrans) != 0) { + + ReleaseReadLock(&atrans->dbase->cache_lock); + ObtainSharedLock(&atrans->dbase->cache_lock); + + if (ubik_CacheUpdate(atrans) != 0) { + + BoostSharedLock(&atrans->dbase->cache_lock); + + ret = (*cbf) (atrans, rock); + if (ret == 0) { + memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version, + sizeof(atrans->dbase->cachedVersion)); + } + } + + /* It would be nice if we could convert from a shared lock to a read + * lock... instead, just release the shared and acquire the read */ + ReleaseSharedLock(&atrans->dbase->cache_lock); + + if (ret) { + /* if we have an error, don't retry, and don't hold any locks */ + return ret; + } + + ObtainReadLock(&atrans->dbase->cache_lock); + } + + atrans->flags |= TRCACHELOCKED; + + return 0; +} + /*! * "Who said anything about panicking?" snapped Arthur. * "This is still just the culture shock. You wait till I've settled down diff --git a/src/ubik/ubik.p.h b/src/ubik/ubik.p.h index 5f9f48f..3fe8b15 100644 --- a/src/ubik/ubik.p.h +++ b/src/ubik/ubik.p.h @@ -190,12 +190,28 @@ struct ubik_dbase { int (*getnfiles) (struct ubik_dbase * adbase); /*!< find out number of files */ short readers; /*!< number of current read transactions */ struct ubik_version cachedVersion; /*!< version of caller's cached data */ +#ifdef UKERNEL + struct afs_lock cache_lock; +#else + struct Lock cache_lock; /*!< protects cached application data */ +#endif #ifdef AFS_PTHREAD_ENV pthread_cond_t version_cond; /*!< condition variable to manage changes to version */ pthread_cond_t flags_cond; /*!< condition variable to manage changes to flags */ #endif }; +/** + * ubik_CheckCache callback function. + * + * @param[in] atrans ubik transaction + * @param[in] rock rock passed to ubik_CheckCache + * + * @return operation status + * @retval 0 cache was read properly + */ +typedef int (*ubik_updatecache_func) (struct ubik_trans *atrans, void *rock); + /*! \name procedures for automatically authenticating ubik connections */ extern int (*ubik_CRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *); @@ -232,6 +248,9 @@ extern void *ubik_CheckRXSecurityRock; #define TRSETLOCK 8 /*!< SetLock is using trans */ #define TRSTALE 16 /*!< udisk_end during getLock */ #endif /* UBIK_PAUSE */ +#define TRCACHELOCKED 32 /*!< this trans has locked dbase->cache_lock + * (meaning, this trans has called + * ubik_CheckCache at some point */ /*\}*/ /*! \name ubik_lock flags */ @@ -494,7 +513,9 @@ extern int ubik_WaitVersion(register struct ubik_dbase *adatabase, register struct ubik_version *aversion); extern int ubik_GetVersion(register struct ubik_trans *atrans, register struct ubik_version *avers); -extern int ubik_CacheUpdate(register struct ubik_trans *atrans); +extern int ubik_CheckCache(struct ubik_trans *atrans, + ubik_updatecache_func check, + void *rock); /*\}*/ /*! \name ubikclient.c */ diff --git a/src/vlserver/vlutils.c b/src/vlserver/vlutils.c index 1d572f0..cd9ff5c 100644 --- a/src/vlserver/vlutils.c +++ b/src/vlserver/vlutils.c @@ -290,37 +290,38 @@ readExtents(struct ubik_trans *trans) /* Check that the database has been initialized. Be careful to fail in a safe manner, to avoid bogusly reinitializing the db. */ -afs_int32 -CheckInit(struct ubik_trans *trans, int builddb) +/** + * reads in db cache from ubik. + * + * @param[in] ut ubik transaction + * @param[in] rock opaque pointer to an int*; if 1, we should rebuild the db + * if it appears empty, if 0 we should return an error if the + * db appears empty + * + * @return operation status + * @retval 0 success + */ +static afs_int32 +UpdateCache(struct ubik_trans *trans, void *rock) { - afs_int32 error = 0, i, code, ubcode = 0; - - /* ubik_CacheUpdate must be called on every transaction. It returns 0 if the - * previous transaction would have left the cache fine, and non-zero otherwise. - * Thus, a local abort or a remote commit will cause this to return non-zero - * and force a header re-read. Necessary for a local abort because we may - * have damaged cheader during the operation. Necessary for a remote commit - * since it may have changed cheader. - */ - if (ubik_CacheUpdate(trans) != 0) { - /* if version changed (or first call), read the header */ - ubcode = vlread(trans, 0, (char *)&cheader, sizeof(cheader)); - vldbversion = ntohl(cheader.vital_header.vldbversion); - - if (!ubcode && (vldbversion != 0)) { - memcpy(HostAddress, cheader.IpMappedAddr, - sizeof(cheader.IpMappedAddr)); - for (i = 0; i < MAXSERVERID + 1; i++) { /* cvt HostAddress to host order */ - HostAddress[i] = ntohl(HostAddress[i]); - } + int *builddb_rock = rock; + int builddb = *builddb_rock; + afs_int32 error = 0, i, code, ubcode; - code = readExtents(trans); - if (code) - ERROR_EXIT(code); + /* if version changed (or first call), read the header */ + ubcode = vlread(trans, 0, (char *)&cheader, sizeof(cheader)); + vldbversion = ntohl(cheader.vital_header.vldbversion); + + if (!ubcode && (vldbversion != 0)) { + memcpy(HostAddress, cheader.IpMappedAddr, sizeof(cheader.IpMappedAddr)); + for (i = 0; i < MAXSERVERID + 1; i++) { /* cvt HostAddress to host order */ + HostAddress[i] = ntohl(HostAddress[i]); } - } - vldbversion = ntohl(cheader.vital_header.vldbversion); + code = readExtents(trans); + if (code) + ERROR_EXIT(code); + } /* now, if can't read, or header is wrong, write a new header */ if (ubcode || vldbversion == 0) { @@ -343,14 +344,18 @@ CheckInit(struct ubik_trans *trans, int builddb) printf("Can't write VLDB header (error = %d)\n", code); ERROR_EXIT(VL_IO); } - } else + vldbversion = ntohl(cheader.vital_header.vldbversion); + } else { ERROR_EXIT(VL_EMPTY); - } else if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION) - && (vldbversion != VLDBVERSION_4)) { + } + } + + if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION) + && (vldbversion != VLDBVERSION_4)) { printf ("VLDB version %d doesn't match this software version(%d, %d or %d), quitting!\n", vldbversion, VLDBVERSION_4, VLDBVERSION, OVLDBVERSION); - ERROR_EXIT(VL_BADVERSION); + return VL_BADVERSION; } maxnservers = ((vldbversion == 3 || vldbversion == 4) ? 13 : 8); @@ -360,6 +365,30 @@ CheckInit(struct ubik_trans *trans, int builddb) return error; } +afs_int32 +CheckInit(struct ubik_trans *trans, int builddb) +{ + afs_int32 code; + + code = ubik_CheckCache(trans, UpdateCache, &builddb); + if (code) { + return code; + } + + /* these next two cases shouldn't happen (UpdateCache should either + * rebuild the db or return an error if these cases occur), but just to + * be on the safe side... */ + if (vldbversion == 0) { + return VL_EMPTY; + } + if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION) + && (vldbversion != VLDBVERSION_4)) { + return VL_BADVERSION; + } + + return 0; +} + afs_int32 GetExtentBlock(register struct ubik_trans *trans, register afs_int32 base) -- 1.9.4