tdb->activeTrans = (struct ubik_trans *)0;
memset(&tdb->version, 0, sizeof(struct ubik_version));
memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
+#ifdef AFS_PTHREAD_ENV
+ assert(pthread_mutex_init(&tdb->versionLock, NULL) == 0);
+#else
Lock_Init(&tdb->versionLock);
+#endif
+ Lock_Init(&tdb->cache_lock);
tdb->flags = 0;
tdb->read = uphys_read;
tdb->write = uphys_write;
#ifdef AFS_PTHREAD_ENV
assert(pthread_cond_init(&tdb->version_cond, NULL) == 0);
assert(pthread_cond_init(&tdb->flags_cond, NULL) == 0);
- assert(pthread_mutex_init(&tdb->version_mutex, NULL) == 0);
- assert(pthread_mutex_init(&tdb->flags_mutex, NULL) == 0);
#endif /* AFS_PTHREAD_ENV */
/* initialize RX */
if (transMode == UBIK_WRITETRANS) {
/* if we're writing already, wait */
while (dbase->flags & DBWRITING) {
- DBRELE(dbase);
#ifdef AFS_PTHREAD_ENV
- assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
- assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
- assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
+ assert(pthread_cond_wait(&dbase->flags_cond, &dbase->versionLock) == 0);
#else
+ DBRELE(dbase);
LWP_WaitProcess(&dbase->flags);
-#endif
DBHOLD(dbase);
+#endif
}
+
if (!ubeacon_AmSyncSite()) {
DBRELE(dbase);
return UNOTSYNC;
register struct ubik_dbase *dbase;
dbase = transPtr->dbase;
+
+ if (transPtr->flags & TRCACHELOCKED) {
+ ReleaseReadLock(&dbase->cache_lock);
+ transPtr->flags &= ~TRCACHELOCKED;
+ }
+
+ ObtainWriteLock(&dbase->cache_lock);
+
DBHOLD(dbase);
memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+ ReleaseWriteLock(&dbase->cache_lock);
+
/* see if we're still up-to-date */
if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
udisk_abort(transPtr);
}
dbase = transPtr->dbase;
+
+ if (transPtr->flags & TRCACHELOCKED) {
+ ReleaseReadLock(&dbase->cache_lock);
+ transPtr->flags &= ~TRCACHELOCKED;
+ }
DBHOLD(dbase);
- memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
/* give up if no longer current */
if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
- return UNOQUORUM;
+ code = UNOQUORUM;
+ goto error;
}
if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
goto success; /* update cachedVersion correctly */
udisk_end(transPtr);
DBRELE(dbase);
- return code;
+ goto error;
}
if (!ubeacon_AmSyncSite()) { /* no longer sync site */
udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
- return UNOTSYNC;
+ code = UNOTSYNC;
+ goto error;
}
/* now it is safe to do commit */
ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
udisk_end(transPtr);
DBRELE(dbase);
- return code;
+ goto error;
}
/* before we can start sending unlock messages, we must wait until all servers
* that are possibly still functioning on the other side of a network partition
break; /* no down ones still pseudo-active */
}
+ /* the commit bumped the dbase version, and since the write was local
+ * our cache should still be up to date, so make sure to update
+ * cachedVersion, too */
+ memcpy(&dbase->cachedVersion, &dbase->version,
+ sizeof(dbase->cachedVersion));
+
/* finally, unlock all the dudes. We can return success independent of the number of servers
* that really unlock the dbase; the others will do it if/when they elect a new sync site.
* The transaction is committed anyway, since we succeeded in contacting a quorum
success:
udisk_end(transPtr);
- /* update version on successful EndTrans */
- memcpy(&dbase->cachedVersion, &dbase->version,
- sizeof(struct ubik_version));
-
+ /* don't update cachedVersion here; it should have been updated way back
+ * in ubik_CheckCache, and earlier in this function for writes */
DBRELE(dbase);
return 0;
+
+ error:
+ ObtainWriteLock(&dbase->cache_lock);
+ memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+ ReleaseWriteLock(&dbase->cache_lock);
+ return code;
}
/*!
ubik_WaitVersion(register struct ubik_dbase *adatabase,
register struct ubik_version *aversion)
{
+ DBHOLD(adatabase);
while (1) {
/* wait until version # changes, and then return */
- if (vcmp(*aversion, adatabase->version) != 0)
+ if (vcmp(*aversion, adatabase->version) != 0) {
+ DBRELE(adatabase);
return 0;
+ }
#ifdef AFS_PTHREAD_ENV
- assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
- assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
- assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
+ assert(pthread_cond_wait(&adatabase->version_cond, &adatabase->versionLock) == 0);
#else
+ DBRELE(adatabase);
LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
+ DBHOLD(adatabase);
#endif
}
}
* If return value is non-zero and the caller is a server caching part of the
* Ubik database, it should invalidate that cache.
*/
-int
+static int
ubik_CacheUpdate(register struct ubik_trans *atrans)
{
if (!(atrans && atrans->dbase))
return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
}
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check function to call to check/update cache
+ * @param[in] rock rock to pass to *check
+ *
+ * @return operation status
+ * @retval 0 success
+ * @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ * up-to-date
+ */
+int
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
+{
+ int ret = 0;
+
+ if (!(atrans && atrans->dbase))
+ return -1;
+
+ ObtainReadLock(&atrans->dbase->cache_lock);
+
+ while (ubik_CacheUpdate(atrans) != 0) {
+
+ ReleaseReadLock(&atrans->dbase->cache_lock);
+ ObtainSharedLock(&atrans->dbase->cache_lock);
+
+ if (ubik_CacheUpdate(atrans) != 0) {
+
+ BoostSharedLock(&atrans->dbase->cache_lock);
+
+ ret = (*cbf) (atrans, rock);
+ if (ret == 0) {
+ memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+ sizeof(atrans->dbase->cachedVersion));
+ }
+ }
+
+ /* It would be nice if we could convert from a shared lock to a read
+ * lock... instead, just release the shared and acquire the read */
+ ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+ if (ret) {
+ /* if we have an error, don't retry, and don't hold any locks */
+ return ret;
+ }
+
+ ObtainReadLock(&atrans->dbase->cache_lock);
+ }
+
+ atrans->flags |= TRCACHELOCKED;
+
+ return 0;
+}
+
/*!
* "Who said anything about panicking?" snapped Arthur.
* "This is still just the culture shock. You wait till I've settled down