Protect ubik cache accesses
[openafs.git] / src / ubik / ubik.c
index 9bb9a97..b86a17b 100644 (file)
@@ -420,6 +420,7 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
 #else
     Lock_Init(&tdb->versionLock);
 #endif
+    Lock_Init(&tdb->cache_lock);
     tdb->flags = 0;
     tdb->read = uphys_read;
     tdb->write = uphys_write;
@@ -749,8 +750,19 @@ ubik_AbortTrans(register struct ubik_trans *transPtr)
     register struct ubik_dbase *dbase;
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
+
+    ObtainWriteLock(&dbase->cache_lock);
+
     DBHOLD(dbase);
     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+    ReleaseWriteLock(&dbase->cache_lock);
+
     /* see if we're still up-to-date */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
@@ -805,15 +817,20 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
     }
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
     DBHOLD(dbase);
-    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
 
     /* give up if no longer current */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOQUORUM;
+       code = UNOQUORUM;
+       goto error;
     }
 
     if (transPtr->type == UBIK_READTRANS) {    /* reads are easy */
@@ -822,14 +839,15 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            goto success;       /* update cachedVersion correctly */
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
 
     if (!ubeacon_AmSyncSite()) {       /* no longer sync site */
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOTSYNC;
+       code = UNOTSYNC;
+       goto error;
     }
 
     /* now it is safe to do commit */
@@ -846,7 +864,7 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
        ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
     /* before we can start sending unlock messages, we must wait until all servers
      * that are possibly still functioning on the other side of a network partition
@@ -882,6 +900,12 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            break;              /* no down ones still pseudo-active */
     }
 
+    /* the commit bumped the dbase version, and since the write was local
+     * our cache should still be up to date, so make sure to update
+     * cachedVersion, too */
+    memcpy(&dbase->cachedVersion, &dbase->version,
+           sizeof(dbase->cachedVersion));
+
     /* finally, unlock all the dudes.  We can return success independent of the number of servers
      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
      * The transaction is committed anyway, since we succeeded in contacting a quorum
@@ -891,12 +915,16 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
 
   success:
     udisk_end(transPtr);
-    /* update version on successful EndTrans */
-    memcpy(&dbase->cachedVersion, &dbase->version,
-          sizeof(struct ubik_version));
-
+    /* don't update cachedVersion here; it should have been updated way back
+     * in ubik_CheckCache, and earlier in this function for writes */
     DBRELE(dbase);
     return 0;
+
+  error:
+    ObtainWriteLock(&dbase->cache_lock);
+    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+    ReleaseWriteLock(&dbase->cache_lock);
+    return code;
 }
 
 /*!
@@ -1231,7 +1259,7 @@ ubik_GetVersion(register struct ubik_trans *atrans,
  * If return value is non-zero and the caller is a server caching part of the 
  * Ubik database, it should invalidate that cache.
  */
-int
+static int
 ubik_CacheUpdate(register struct ubik_trans *atrans)
 {
     if (!(atrans && atrans->dbase))
@@ -1239,6 +1267,73 @@ ubik_CacheUpdate(register struct ubik_trans *atrans)
     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
 }
 
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check  function to call to check/update cache
+ * @param[in] rock   rock to pass to *check
+ *
+ * @return operation status
+ *   @retval 0       success
+ *   @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ *       up-to-date
+ */
+int
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
+{
+    int ret = 0;
+
+    if (!(atrans && atrans->dbase))
+       return -1;
+
+    ObtainReadLock(&atrans->dbase->cache_lock);
+
+    while (ubik_CacheUpdate(atrans) != 0) {
+
+       ReleaseReadLock(&atrans->dbase->cache_lock);
+       ObtainSharedLock(&atrans->dbase->cache_lock);
+
+       if (ubik_CacheUpdate(atrans) != 0) {
+
+           BoostSharedLock(&atrans->dbase->cache_lock);
+
+           ret = (*cbf) (atrans, rock);
+           if (ret == 0) {
+               memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+                      sizeof(atrans->dbase->cachedVersion));
+           }
+       }
+
+       /* It would be nice if we could convert from a shared lock to a read
+        * lock... instead, just release the shared and acquire the read */
+       ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+       if (ret) {
+           /* if we have an error, don't retry, and don't hold any locks */
+           return ret;
+       }
+
+       ObtainReadLock(&atrans->dbase->cache_lock);
+    }
+
+    atrans->flags |= TRCACHELOCKED;
+
+    return 0;
+}
+
 /*!
  * "Who said anything about panicking?" snapped Arthur. 
  * "This is still just the culture shock. You wait till I've settled down