Protect ubik cache accesses
[openafs.git] / src / ubik / ubik.c
index 4187ac0..b86a17b 100644 (file)
@@ -415,7 +415,12 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
     tdb->activeTrans = (struct ubik_trans *)0;
     memset(&tdb->version, 0, sizeof(struct ubik_version));
     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
+#ifdef AFS_PTHREAD_ENV
+    assert(pthread_mutex_init(&tdb->versionLock, NULL) == 0);
+#else
     Lock_Init(&tdb->versionLock);
+#endif
+    Lock_Init(&tdb->cache_lock);
     tdb->flags = 0;
     tdb->read = uphys_read;
     tdb->write = uphys_write;
@@ -434,8 +439,6 @@ ubik_ServerInitCommon(afs_int32 myHost, short myPort,
 #ifdef AFS_PTHREAD_ENV
     assert(pthread_cond_init(&tdb->version_cond, NULL) == 0);
     assert(pthread_cond_init(&tdb->flags_cond, NULL) == 0);
-    assert(pthread_mutex_init(&tdb->version_mutex, NULL) == 0);
-    assert(pthread_mutex_init(&tdb->flags_mutex, NULL) == 0);
 #endif /* AFS_PTHREAD_ENV */
 
     /* initialize RX */
@@ -662,16 +665,15 @@ BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
     if (transMode == UBIK_WRITETRANS) {
        /* if we're writing already, wait */
        while (dbase->flags & DBWRITING) {
-           DBRELE(dbase);
 #ifdef AFS_PTHREAD_ENV
-           assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
-           assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
-           assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
+           assert(pthread_cond_wait(&dbase->flags_cond, &dbase->versionLock) == 0);
 #else
+           DBRELE(dbase);
            LWP_WaitProcess(&dbase->flags);
-#endif
            DBHOLD(dbase);
+#endif
        }
+
        if (!ubeacon_AmSyncSite()) {
            DBRELE(dbase);
            return UNOTSYNC;
@@ -748,8 +750,19 @@ ubik_AbortTrans(register struct ubik_trans *transPtr)
     register struct ubik_dbase *dbase;
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
+
+    ObtainWriteLock(&dbase->cache_lock);
+
     DBHOLD(dbase);
     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+    ReleaseWriteLock(&dbase->cache_lock);
+
     /* see if we're still up-to-date */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
@@ -804,15 +817,20 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
     }
 
     dbase = transPtr->dbase;
+
+    if (transPtr->flags & TRCACHELOCKED) {
+       ReleaseReadLock(&dbase->cache_lock);
+       transPtr->flags &= ~TRCACHELOCKED;
+    }
     DBHOLD(dbase);
-    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
 
     /* give up if no longer current */
     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOQUORUM;
+       code = UNOQUORUM;
+       goto error;
     }
 
     if (transPtr->type == UBIK_READTRANS) {    /* reads are easy */
@@ -821,14 +839,15 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            goto success;       /* update cachedVersion correctly */
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
 
     if (!ubeacon_AmSyncSite()) {       /* no longer sync site */
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return UNOTSYNC;
+       code = UNOTSYNC;
+       goto error;
     }
 
     /* now it is safe to do commit */
@@ -845,7 +864,7 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
        ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
        udisk_end(transPtr);
        DBRELE(dbase);
-       return code;
+       goto error;
     }
     /* before we can start sending unlock messages, we must wait until all servers
      * that are possibly still functioning on the other side of a network partition
@@ -881,6 +900,12 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
            break;              /* no down ones still pseudo-active */
     }
 
+    /* the commit bumped the dbase version, and since the write was local
+     * our cache should still be up to date, so make sure to update
+     * cachedVersion, too */
+    memcpy(&dbase->cachedVersion, &dbase->version,
+           sizeof(dbase->cachedVersion));
+
     /* finally, unlock all the dudes.  We can return success independent of the number of servers
      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
      * The transaction is committed anyway, since we succeeded in contacting a quorum
@@ -890,12 +915,16 @@ ubik_EndTrans(register struct ubik_trans *transPtr)
 
   success:
     udisk_end(transPtr);
-    /* update version on successful EndTrans */
-    memcpy(&dbase->cachedVersion, &dbase->version,
-          sizeof(struct ubik_version));
-
+    /* don't update cachedVersion here; it should have been updated way back
+     * in ubik_CheckCache, and earlier in this function for writes */
     DBRELE(dbase);
     return 0;
+
+  error:
+    ObtainWriteLock(&dbase->cache_lock);
+    memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+    ReleaseWriteLock(&dbase->cache_lock);
+    return code;
 }
 
 /*!
@@ -1194,16 +1223,19 @@ int
 ubik_WaitVersion(register struct ubik_dbase *adatabase,
                 register struct ubik_version *aversion)
 {
+    DBHOLD(adatabase);
     while (1) {
        /* wait until version # changes, and then return */
-       if (vcmp(*aversion, adatabase->version) != 0)
+       if (vcmp(*aversion, adatabase->version) != 0) {
+           DBRELE(adatabase);
            return 0;
+       }
 #ifdef AFS_PTHREAD_ENV
-       assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
-       assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
-       assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
+       assert(pthread_cond_wait(&adatabase->version_cond, &adatabase->versionLock) == 0);
 #else
+       DBRELE(adatabase);
        LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
+       DBHOLD(adatabase);
 #endif
     }
 }
@@ -1227,7 +1259,7 @@ ubik_GetVersion(register struct ubik_trans *atrans,
  * If return value is non-zero and the caller is a server caching part of the 
  * Ubik database, it should invalidate that cache.
  */
-int
+static int
 ubik_CacheUpdate(register struct ubik_trans *atrans)
 {
     if (!(atrans && atrans->dbase))
@@ -1235,6 +1267,73 @@ ubik_CacheUpdate(register struct ubik_trans *atrans)
     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
 }
 
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check  function to call to check/update cache
+ * @param[in] rock   rock to pass to *check
+ *
+ * @return operation status
+ *   @retval 0       success
+ *   @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ *       up-to-date
+ */
+int
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
+{
+    int ret = 0;
+
+    if (!(atrans && atrans->dbase))
+       return -1;
+
+    ObtainReadLock(&atrans->dbase->cache_lock);
+
+    while (ubik_CacheUpdate(atrans) != 0) {
+
+       ReleaseReadLock(&atrans->dbase->cache_lock);
+       ObtainSharedLock(&atrans->dbase->cache_lock);
+
+       if (ubik_CacheUpdate(atrans) != 0) {
+
+           BoostSharedLock(&atrans->dbase->cache_lock);
+
+           ret = (*cbf) (atrans, rock);
+           if (ret == 0) {
+               memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+                      sizeof(atrans->dbase->cachedVersion));
+           }
+       }
+
+       /* It would be nice if we could convert from a shared lock to a read
+        * lock... instead, just release the shared and acquire the read */
+       ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+       if (ret) {
+           /* if we have an error, don't retry, and don't hold any locks */
+           return ret;
+       }
+
+       ObtainReadLock(&atrans->dbase->cache_lock);
+    }
+
+    atrans->flags |= TRCACHELOCKED;
+
+    return 0;
+}
+
 /*!
  * "Who said anything about panicking?" snapped Arthur. 
  * "This is still just the culture shock. You wait till I've settled down