pollCount = 0;
memset(&db, 0, sizeof(db));
- Lock_Init(&db.lock);
if ((code = InitDBalloc()) || (code = InitDBhash()))
return code;
return 0;
/* check that the database has been initialized. Be careful to fail in a safe
manner, to avoid bogusly reinitializing the db. */
-afs_int32
-CheckInit(struct ubik_trans *ut,
- int (*db_init) (struct ubik_trans *ut)) /* call if rebuilding DB */
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[in] rock opaque pointer to an int (*) (struct ubik_trans *), which
+ * will be called on rebuilding the database (or NULL to not
+ * rebuild the db)
+ *
+ * @return operation status
+ * @retval 0 success
+ */
+static afs_int32
+UpdateCache(struct ubik_trans *ut, void *rock)
{
- register afs_int32 code;
-
- /* Don't read header if not necessary */
- if (!ubik_CacheUpdate(ut))
- return 0;
-
- ObtainWriteLock(&db.lock);
+ int (*db_init) (struct ubik_trans *ut) = rock;
+ afs_int32 code;
db.h.eofPtr = htonl(sizeof(db.h)); /* for sanity check in dbread */
code = dbread(ut, 0, (char *)&db.h, sizeof(db.h));
ht_Reset(&db.dumpIden);
error_exit:
- ReleaseWriteLock(&db.lock);
if (code) {
if ((code == UEOF) || (code == BUDB_EMPTY)) {
if (db_init) {
}
return code;
}
+
+afs_int32
+CheckInit(struct ubik_trans *ut,
+ int (*db_init) (struct ubik_trans *ut)) /* call if rebuilding DB */
+{
+ return ubik_CheckCache(ut, UpdateCache, db_init);
+}
return code;
}
-static struct Lock cheader_lock;
static struct Lock keycache_lock;
static int maxCachedKeys;
void
init_kadatabase(int initFlags)
{
- Lock_Init(&cheader_lock);
Lock_Init(&keycache_lock);
maxCachedKeys = 10;
/* check that the database has been initialized. Be careful to fail in a safe
manner, to avoid bogusly reinitializing the db. */
-
-afs_int32
-CheckInit(struct ubik_trans *at,
- int (*db_init) (struct ubik_trans *)) /* procedure to call if rebuilding DB */
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[in] rock opaque pointer to an int (*) (struct ubik_trans *), which
+ * will be called on rebuilding the database (or NULL to not
+ * rebuild the db)
+ *
+ * @return operation status
+ * @retval 0 success
+ */
+static afs_int32
+UpdateCache(struct ubik_trans *at, void *rock)
{
- register afs_int32 code;
+ int (*db_init) (struct ubik_trans *) = rock;
+ afs_int32 code;
afs_int32 iversion;
afs_int32 tversion;
- /* Don't read header if not necessary */
- if (!ubik_CacheUpdate(at))
- return 0;
-
- ObtainWriteLock(&cheader_lock);
if ((code = karead(at, 0, (char *)&iversion, sizeof(iversion)))
|| (code =
karead(at, sizeof(cheader) - sizeof(afs_int32), (char *)&tversion,
code = KAIO;
}
}
- ReleaseWriteLock(&cheader_lock);
if (code == 0)
return 0;
return db_init(at); /* initialize the db */
}
+afs_int32
+CheckInit(struct ubik_trans *at,
+ int (*db_init) (struct ubik_trans *)) /* procedure to call if rebuilding DB */
+{
+ return ubik_CheckCache(at, UpdateCache, db_init);
+}
+
/* Allocate a free block of storage for entry, returning address of a new
zeroed entry. If zero is returned, a Ubik I/O error can be assumed. */
{
int i;
- memcpy(&info->cheader_lock, &cheader_lock, sizeof(info->cheader_lock));
+ /* cheader_lock no longer exists */
+ memset(&info->cheader_lock, 0, sizeof(info->cheader_lock));
memcpy(&info->keycache_lock, &keycache_lock, sizeof(info->keycache_lock));
info->kcVersion = keyCacheVersion;
extern int restricted;
extern struct ubik_dbase *dbase;
extern int pr_noAuth;
-extern afs_int32 initd;
extern char *pr_realmName;
extern int prp_group_default;
extern int prp_user_default;
struct ubik_trans *at, afs_int32 *aid,
char *aname);
-/* When abort, reset initd so that the header is read in on next call.
+/* when we abort, the ubik cachedVersion will be reset, so we'll read in the
+ * header on the next call.
* Abort the transaction and return the code.
*/
-#define ABORT_WITH(tt,code) return(initd=0,ubik_AbortTrans(tt),code)
+#define ABORT_WITH(tt,code) return(ubik_AbortTrans(tt),code)
static int
CreateOK(struct ubik_trans *ut, afs_int32 cid, afs_int32 oid, afs_int32 flag,
return PRSUCCESS;
}
-afs_int32
-read_DbHeader(struct ubik_trans *tt)
+static afs_int32
+UpdateCache(struct ubik_trans *tt, void *rock)
{
afs_int32 code;
- if (!ubik_CacheUpdate(tt))
- return 0;
-
code = pr_Read(tt, 0, 0, (char *)&cheader, sizeof(cheader));
if (code != 0) {
afs_com_err(whoami, code, "Couldn't read header");
return code;
}
+afs_int32
+read_DbHeader(struct ubik_trans *tt)
+{
+ return ubik_CheckCache(tt, UpdateCache, NULL);
+}
+
int pr_noAuth;
-afs_int32 initd = 0;
+
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[out] rock opaque pointer to an int*, which on success will be set
+ * to 1 if we need to build the database, or 0 if we do not
+ *
+ * @return operation status
+ * @retval 0 success
+ */
+static afs_int32
+Initdb_check(struct ubik_trans *tt, void *rock)
+{
+ int *build_rock = rock;
+ afs_int32 code;
+ afs_int32 len;
+
+ len = sizeof(cheader);
+ code = pr_Read(tt, 0, 0, (char *)&cheader, len);
+ if (code != 0) {
+ afs_com_err(whoami, code, "couldn't read header");
+ return code;
+ }
+ if ((ntohl(cheader.version) == PRDBVERSION)
+ && ntohl(cheader.headerSize) == sizeof(cheader)
+ && ntohl(cheader.eofPtr) != 0
+ && FindByID(tt, ANONYMOUSID) != 0) {
+ /* database exists, so we don't have to build it */
+ *build_rock = 0;
+ return 0;
+ }
+
+ /* else we need to build a database */
+ *build_rock = 1;
+ return 0;
+}
afs_int32
Initdb(void)
{
- afs_int32 code;
struct ubik_trans *tt;
- afs_int32 len;
+ int build = 0;
+ afs_int32 code;
/* init the database. We'll try reading it, but if we're starting
* from scratch, we'll have to do a write transaction. */
ubik_AbortTrans(tt);
return code;
}
- if (!initd) {
- initd = 1;
- } else if (!ubik_CacheUpdate(tt)) {
- code = ubik_EndTrans(tt);
- return code;
- }
- len = sizeof(cheader);
- code = pr_Read(tt, 0, 0, (char *)&cheader, len);
- if (code != 0) {
- afs_com_err(whoami, code, "couldn't read header");
+ code = ubik_CheckCache(tt, Initdb_check, &build);
+ if (code) {
ubik_AbortTrans(tt);
return code;
}
- if ((ntohl(cheader.version) == PRDBVERSION)
- && ntohl(cheader.headerSize) == sizeof(cheader)
- && ntohl(cheader.eofPtr) != 0
- && FindByID(tt, ANONYMOUSID) != 0) {
- /* database exists, so we don't have to build it */
- code = ubik_EndTrans(tt);
- if (code)
- return code;
- return PRSUCCESS;
- }
- /* else we need to build a database */
- code = ubik_EndTrans(tt);
- if (code)
- return code;
- /* Only rebuild database if the db was deleted (the header is zero) and we
- * are running noAuth. */
- {
+ if (build) {
+ /* Only rebuild database if the db was deleted (the header is zero) and we
+ * are running noAuth. */
char *bp = (char *)&cheader;
int i;
- for (i = 0; i < sizeof(cheader); i++)
+ for (i = 0; i < sizeof(cheader); i++) {
if (bp[i]) {
code = PRDBBAD;
afs_com_err(whoami, code,
"Can't rebuild database because it is not empty");
- return code;
+ break;
}
+ }
+ if (!pr_noAuth) {
+ code = PRDBBAD;
+ afs_com_err(whoami, code,
+ "Can't rebuild database because not running NoAuth");
+ }
}
- if (!pr_noAuth) {
- code = PRDBBAD;
- afs_com_err(whoami, code,
- "Can't rebuild database because not running NoAuth");
+
+ if (code) {
+ ubik_EndTrans(tt);
+ } else {
+ code = ubik_EndTrans(tt);
+ }
+ if (code || !build) {
+ /* either we encountered an error, or we don't need to build the db */
return code;
}
* actually have been a good database out there. Now that we have a
* real write transaction, make sure things are still bad.
*/
+ code = pr_Read(tt, 0, 0, (char *)&cheader, sizeof(cheader));
+ if (code != 0) {
+ afs_com_err(whoami, code, "couldn't read header");
+ ubik_AbortTrans(tt);
+ return code;
+ }
if ((ntohl(cheader.version) == PRDBVERSION)
&& ntohl(cheader.headerSize) == sizeof(cheader)
&& ntohl(cheader.eofPtr) != 0
}
int
-ubik_CacheUpdate(register struct ubik_trans *atrans)
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
{
return (0);
}
#else
Lock_Init(&tdb->versionLock);
#endif
+ Lock_Init(&tdb->cache_lock);
tdb->flags = 0;
tdb->read = uphys_read;
tdb->write = uphys_write;
register struct ubik_dbase *dbase;
dbase = transPtr->dbase;
+
+ if (transPtr->flags & TRCACHELOCKED) {
+ ReleaseReadLock(&dbase->cache_lock);
+ transPtr->flags &= ~TRCACHELOCKED;
+ }
+
+ ObtainWriteLock(&dbase->cache_lock);
+
DBHOLD(dbase);
memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+ ReleaseWriteLock(&dbase->cache_lock);
+
/* see if we're still up-to-date */
if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
udisk_abort(transPtr);
}
dbase = transPtr->dbase;
+
+ if (transPtr->flags & TRCACHELOCKED) {
+ ReleaseReadLock(&dbase->cache_lock);
+ transPtr->flags &= ~TRCACHELOCKED;
+ }
DBHOLD(dbase);
- memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
/* give up if no longer current */
if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
- return UNOQUORUM;
+ code = UNOQUORUM;
+ goto error;
}
if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
goto success; /* update cachedVersion correctly */
udisk_end(transPtr);
DBRELE(dbase);
- return code;
+ goto error;
}
if (!ubeacon_AmSyncSite()) { /* no longer sync site */
udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
- return UNOTSYNC;
+ code = UNOTSYNC;
+ goto error;
}
/* now it is safe to do commit */
ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
udisk_end(transPtr);
DBRELE(dbase);
- return code;
+ goto error;
}
/* before we can start sending unlock messages, we must wait until all servers
* that are possibly still functioning on the other side of a network partition
break; /* no down ones still pseudo-active */
}
+ /* the commit bumped the dbase version, and since the write was local
+ * our cache should still be up to date, so make sure to update
+ * cachedVersion, too */
+ memcpy(&dbase->cachedVersion, &dbase->version,
+ sizeof(dbase->cachedVersion));
+
/* finally, unlock all the dudes. We can return success independent of the number of servers
* that really unlock the dbase; the others will do it if/when they elect a new sync site.
* The transaction is committed anyway, since we succeeded in contacting a quorum
success:
udisk_end(transPtr);
- /* update version on successful EndTrans */
- memcpy(&dbase->cachedVersion, &dbase->version,
- sizeof(struct ubik_version));
-
+ /* don't update cachedVersion here; it should have been updated way back
+ * in ubik_CheckCache, and earlier in this function for writes */
DBRELE(dbase);
return 0;
+
+ error:
+ ObtainWriteLock(&dbase->cache_lock);
+ memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+ ReleaseWriteLock(&dbase->cache_lock);
+ return code;
}
/*!
* If return value is non-zero and the caller is a server caching part of the
* Ubik database, it should invalidate that cache.
*/
-int
+static int
ubik_CacheUpdate(register struct ubik_trans *atrans)
{
if (!(atrans && atrans->dbase))
return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
}
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check function to call to check/update cache
+ * @param[in] rock rock to pass to *check
+ *
+ * @return operation status
+ * @retval 0 success
+ * @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ * up-to-date
+ */
+int
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
+{
+ int ret = 0;
+
+ if (!(atrans && atrans->dbase))
+ return -1;
+
+ ObtainReadLock(&atrans->dbase->cache_lock);
+
+ while (ubik_CacheUpdate(atrans) != 0) {
+
+ ReleaseReadLock(&atrans->dbase->cache_lock);
+ ObtainSharedLock(&atrans->dbase->cache_lock);
+
+ if (ubik_CacheUpdate(atrans) != 0) {
+
+ BoostSharedLock(&atrans->dbase->cache_lock);
+
+ ret = (*cbf) (atrans, rock);
+ if (ret == 0) {
+ memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+ sizeof(atrans->dbase->cachedVersion));
+ }
+ }
+
+ /* It would be nice if we could convert from a shared lock to a read
+ * lock... instead, just release the shared and acquire the read */
+ ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+ if (ret) {
+ /* if we have an error, don't retry, and don't hold any locks */
+ return ret;
+ }
+
+ ObtainReadLock(&atrans->dbase->cache_lock);
+ }
+
+ atrans->flags |= TRCACHELOCKED;
+
+ return 0;
+}
+
/*!
* "Who said anything about panicking?" snapped Arthur.
* "This is still just the culture shock. You wait till I've settled down
int (*getnfiles) (struct ubik_dbase * adbase); /*!< find out number of files */
short readers; /*!< number of current read transactions */
struct ubik_version cachedVersion; /*!< version of caller's cached data */
+#ifdef UKERNEL
+ struct afs_lock cache_lock;
+#else
+ struct Lock cache_lock; /*!< protects cached application data */
+#endif
#ifdef AFS_PTHREAD_ENV
pthread_cond_t version_cond; /*!< condition variable to manage changes to version */
pthread_cond_t flags_cond; /*!< condition variable to manage changes to flags */
#endif
};
+/**
+ * ubik_CheckCache callback function.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] rock rock passed to ubik_CheckCache
+ *
+ * @return operation status
+ * @retval 0 cache was read properly
+ */
+typedef int (*ubik_updatecache_func) (struct ubik_trans *atrans, void *rock);
+
/*! \name procedures for automatically authenticating ubik connections */
extern int (*ubik_CRXSecurityProc) (void *, struct rx_securityClass **,
afs_int32 *);
#define TRSETLOCK 8 /*!< SetLock is using trans */
#define TRSTALE 16 /*!< udisk_end during getLock */
#endif /* UBIK_PAUSE */
+#define TRCACHELOCKED 32 /*!< this trans has locked dbase->cache_lock
+ * (meaning, this trans has called
+ * ubik_CheckCache at some point */
/*\}*/
/*! \name ubik_lock flags */
register struct ubik_version *aversion);
extern int ubik_GetVersion(register struct ubik_trans *atrans,
register struct ubik_version *avers);
-extern int ubik_CacheUpdate(register struct ubik_trans *atrans);
+extern int ubik_CheckCache(struct ubik_trans *atrans,
+ ubik_updatecache_func check,
+ void *rock);
/*\}*/
/*! \name ubikclient.c */
/* Check that the database has been initialized. Be careful to fail in a safe
manner, to avoid bogusly reinitializing the db. */
-afs_int32
-CheckInit(struct ubik_trans *trans, int builddb)
+/**
+ * reads in db cache from ubik.
+ *
+ * @param[in] ut ubik transaction
+ * @param[in] rock opaque pointer to an int*; if 1, we should rebuild the db
+ * if it appears empty, if 0 we should return an error if the
+ * db appears empty
+ *
+ * @return operation status
+ * @retval 0 success
+ */
+static afs_int32
+UpdateCache(struct ubik_trans *trans, void *rock)
{
- afs_int32 error = 0, i, code, ubcode = 0;
-
- /* ubik_CacheUpdate must be called on every transaction. It returns 0 if the
- * previous transaction would have left the cache fine, and non-zero otherwise.
- * Thus, a local abort or a remote commit will cause this to return non-zero
- * and force a header re-read. Necessary for a local abort because we may
- * have damaged cheader during the operation. Necessary for a remote commit
- * since it may have changed cheader.
- */
- if (ubik_CacheUpdate(trans) != 0) {
- /* if version changed (or first call), read the header */
- ubcode = vlread(trans, 0, (char *)&cheader, sizeof(cheader));
- vldbversion = ntohl(cheader.vital_header.vldbversion);
-
- if (!ubcode && (vldbversion != 0)) {
- memcpy(HostAddress, cheader.IpMappedAddr,
- sizeof(cheader.IpMappedAddr));
- for (i = 0; i < MAXSERVERID + 1; i++) { /* cvt HostAddress to host order */
- HostAddress[i] = ntohl(HostAddress[i]);
- }
+ int *builddb_rock = rock;
+ int builddb = *builddb_rock;
+ afs_int32 error = 0, i, code, ubcode;
- code = readExtents(trans);
- if (code)
- ERROR_EXIT(code);
+ /* if version changed (or first call), read the header */
+ ubcode = vlread(trans, 0, (char *)&cheader, sizeof(cheader));
+ vldbversion = ntohl(cheader.vital_header.vldbversion);
+
+ if (!ubcode && (vldbversion != 0)) {
+ memcpy(HostAddress, cheader.IpMappedAddr, sizeof(cheader.IpMappedAddr));
+ for (i = 0; i < MAXSERVERID + 1; i++) { /* cvt HostAddress to host order */
+ HostAddress[i] = ntohl(HostAddress[i]);
}
- }
- vldbversion = ntohl(cheader.vital_header.vldbversion);
+ code = readExtents(trans);
+ if (code)
+ ERROR_EXIT(code);
+ }
/* now, if can't read, or header is wrong, write a new header */
if (ubcode || vldbversion == 0) {
printf("Can't write VLDB header (error = %d)\n", code);
ERROR_EXIT(VL_IO);
}
- } else
+ vldbversion = ntohl(cheader.vital_header.vldbversion);
+ } else {
ERROR_EXIT(VL_EMPTY);
- } else if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION)
- && (vldbversion != VLDBVERSION_4)) {
+ }
+ }
+
+ if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION)
+ && (vldbversion != VLDBVERSION_4)) {
printf
("VLDB version %d doesn't match this software version(%d, %d or %d), quitting!\n",
vldbversion, VLDBVERSION_4, VLDBVERSION, OVLDBVERSION);
- ERROR_EXIT(VL_BADVERSION);
+ return VL_BADVERSION;
}
maxnservers = ((vldbversion == 3 || vldbversion == 4) ? 13 : 8);
return error;
}
+afs_int32
+CheckInit(struct ubik_trans *trans, int builddb)
+{
+ afs_int32 code;
+
+ code = ubik_CheckCache(trans, UpdateCache, &builddb);
+ if (code) {
+ return code;
+ }
+
+ /* these next two cases shouldn't happen (UpdateCache should either
+ * rebuild the db or return an error if these cases occur), but just to
+ * be on the safe side... */
+ if (vldbversion == 0) {
+ return VL_EMPTY;
+ }
+ if ((vldbversion != VLDBVERSION) && (vldbversion != OVLDBVERSION)
+ && (vldbversion != VLDBVERSION_4)) {
+ return VL_BADVERSION;
+ }
+
+ return 0;
+}
+
afs_int32
GetExtentBlock(register struct ubik_trans *trans, register afs_int32 base)