ubik: add interface for reading during write locks
authorAndrew Deason <adeason@sinenomine.net>
Thu, 20 May 2010 20:22:11 +0000 (15:22 -0500)
committerDerrick Brashear <shadow@dementia.org>
Mon, 23 Aug 2010 16:32:13 +0000 (09:32 -0700)
Add ubik_BeginTransReadAnyWrite, which allows for reading from the
database, even while there is a conflicting ubik write lock. Reads are
still blocked while the local database is updating due to a write
transaction commit.

Change-Id: I025e595ad699d5a969a0676691530d90c65f1920
Reviewed-on: http://gerrit.openafs.org/2592
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>

src/ubik/lock.c
src/ubik/remote.c
src/ubik/ubik.c
src/ubik/ubik.p.h

index 6c609e5..b8a7a27 100644 (file)
@@ -80,6 +80,10 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await)
     if (atrans->flags & TRDONE)
        return UDONE;
 
+    if (atype != LOCKREAD && (atrans->flags & TRREADWRITE)) {
+       return EINVAL;
+    }
+
     if (atrans->locktype != 0) {
        ubik_print("Ubik: Internal Error: attempted to take lock twice\n");
        abort();
@@ -91,7 +95,7 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await)
  */
 
     /* Check if the lock would would block */
-    if (!await) {
+    if (!await && !(atrans->flags & TRREADWRITE)) {
        if (atype == LOCKREAD) {
            if (WouldReadBlock(&rwlock))
                return EAGAIN;
@@ -120,7 +124,9 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await)
     atrans->locktype = LOCKWAIT;
 #endif /* UBIK_PAUSE */
     DBRELE(dbase);
-    if (atype == LOCKREAD) {
+    if (atrans->flags & TRREADWRITE) {
+       /* noop; don't actually lock anything for TRREADWRITE */
+    } else if (atype == LOCKREAD) {
        ObtainReadLock(&rwlock);
     } else {
        ObtainWriteLock(&rwlock);
@@ -156,7 +162,15 @@ ulock_relLock(struct ubik_trans *atrans)
     if (rwlockinit)
        return;
 
-    if (atrans->locktype == LOCKREAD) {
+    if (atrans->locktype == LOCKWRITE && (atrans->flags & TRREADWRITE)) {
+       ubik_print("Ubik: Internal Error: unlocking write lock with "
+                  "TRREADWRITE?\n");
+       abort();
+    }
+
+    if (atrans->flags & TRREADWRITE) {
+       /* noop, TRREADWRITE means we don't actually lock anything */
+    } else if (atrans->locktype == LOCKREAD) {
        ReleaseReadLock(&rwlock);
     } else if (atrans->locktype == LOCKWRITE) {
        ReleaseWriteLock(&rwlock);
index 3654b77..45404ec 100644 (file)
@@ -114,10 +114,15 @@ SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
     }
 
     dbase = ubik_currentTrans->dbase;
+
+    ObtainWriteLock(&dbase->cache_lock);
+
     DBHOLD(dbase);
+
     urecovery_CheckTid(atid);
     if (!ubik_currentTrans) {
        DBRELE(dbase);
+       ReleaseWriteLock(&dbase->cache_lock);
        return USYNC;
     }
 
@@ -127,6 +132,7 @@ SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
        ubik_dbVersion = ubik_dbase->version;
     }
     DBRELE(dbase);
+    ReleaseWriteLock(&dbase->cache_lock);
     return code;
 }
 
index fd8caa6..b8088a5 100644 (file)
@@ -83,6 +83,7 @@ afs_int32 ubik_epochTime = 0;
 afs_int32 urecovery_state = 0;
 int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
 void *ubik_SRXSecurityRock;
+int (*ubik_SyncWriterCacheProc) (void);
 struct ubik_server *ubik_servers;
 short ubik_callPortal;
 
@@ -602,8 +603,9 @@ ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
  * An open mode of ubik_READTRANS identifies this as a read transaction,
  * while a mode of ubik_WRITETRANS identifies this as a write transaction.
  * transPtr is set to the returned transaction control block.
- * The readAny flag is set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
- * ubik_BeginTransReadAny() below.
+ * The readAny flag is set to 0 or 1 or 2 by the wrapper functions
+ * ubik_BeginTrans() or ubik_BeginTransReadAny() or
+ * ubik_BeginTransReadAnyWrite() below.
  *
  * \note We can only begin transaction when we have an up-to-date database.
  */
@@ -618,6 +620,16 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
     int count;
 #endif /* UBIK_PAUSE */
 
+    if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
+       /* it's not safe to use ubik_BeginTransReadAnyWrite without a
+        * cache-syncing function; fall back to ubik_BeginTransReadAny,
+        * which is safe but slower */
+       ubik_print("ubik_BeginTransReadAnyWrite called, but "
+                  "ubik_SyncWriterCacheProc not set; pretending "
+                  "ubik_BeginTransReadAny was called instead\n");
+       readAny = 1;
+    }
+
     if ((transMode != UBIK_READTRANS) && readAny)
        return UBADTYPE;
     DBHOLD(dbase);
@@ -687,8 +699,12 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
        DBRELE(dbase);
        return code;
     }
-    if (readAny)
+    if (readAny) {
        tt->flags |= TRREADANY;
+       if (readAny > 1) {
+           tt->flags |= TRREADWRITE;
+       }
+    }
     /* label trans and dbase with new tid */
     tt->tid.epoch = ubik_epochTime;
     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
@@ -740,6 +756,16 @@ ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
 }
 
 /*!
+ * \see BeginTrans()
+ */
+int
+ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
+                            struct ubik_trans **transPtr)
+{
+    return BeginTrans(dbase, transMode, transPtr, 2);
+}
+
+/*!
  * \brief This routine ends a read or write transaction by aborting it.
  */
 int
@@ -794,6 +820,23 @@ ubik_AbortTrans(struct ubik_trans *transPtr)
     return (code ? code : code2);
 }
 
+static void
+WritebackApplicationCache(struct ubik_dbase *dbase)
+{
+    int code = 0;
+    if (ubik_SyncWriterCacheProc) {
+       code = ubik_SyncWriterCacheProc();
+    }
+    if (code) {
+       /* we failed to sync the local cache, so just invalidate the cache;
+        * we'll try to read the cache in again on the next read */
+       memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
+    } else {
+       memcpy(&dbase->cachedVersion, &dbase->version,
+              sizeof(dbase->cachedVersion));
+    }
+}
+
 /*!
  * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
  * \return an error code.
@@ -806,6 +849,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
     afs_int32 realStart;
     struct ubik_server *ts;
     afs_int32 now;
+    int cachelocked = 0;
     struct ubik_dbase *dbase;
 
     if (transPtr->type == UBIK_WRITETRANS) {
@@ -822,6 +866,13 @@ ubik_EndTrans(struct ubik_trans *transPtr)
        ReleaseReadLock(&dbase->cache_lock);
        transPtr->flags &= ~TRCACHELOCKED;
     }
+
+    if (transPtr->type != UBIK_READTRANS) {
+       /* must hold cache_lock before DBHOLD'ing */
+       ObtainWriteLock(&dbase->cache_lock);
+       cachelocked = 1;
+    }
+
     DBHOLD(dbase);
 
     /* give up if no longer current */
@@ -852,8 +903,20 @@ ubik_EndTrans(struct ubik_trans *transPtr)
 
     /* now it is safe to do commit */
     code = udisk_commit(transPtr);
-    if (code == 0)
+    if (code == 0) {
+       /* db data has been committed locally; update the local cache so
+        * readers can get at it */
+       WritebackApplicationCache(dbase);
+
+       ReleaseWriteLock(&dbase->cache_lock);
+
        code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
+
+    } else {
+       memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+       ReleaseWriteLock(&dbase->cache_lock);
+    }
+    cachelocked = 0;
     if (code) {
        /* failed to commit, so must return failure.  Try to clear locks first, just for fun
         * Note that we don't know if this transaction will eventually commit at this point.
@@ -900,12 +963,6 @@ ubik_EndTrans(struct ubik_trans *transPtr)
            break;              /* no down ones still pseudo-active */
     }
 
-    /* the commit bumped the dbase version, and since the write was local
-     * our cache should still be up to date, so make sure to update
-     * cachedVersion, too */
-    memcpy(&dbase->cachedVersion, &dbase->version,
-           sizeof(dbase->cachedVersion));
-
     /* finally, unlock all the dudes.  We can return success independent of the number of servers
      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
      * The transaction is committed anyway, since we succeeded in contacting a quorum
@@ -918,10 +975,15 @@ ubik_EndTrans(struct ubik_trans *transPtr)
     /* don't update cachedVersion here; it should have been updated way back
      * in ubik_CheckCache, and earlier in this function for writes */
     DBRELE(dbase);
+    if (cachelocked) {
+       ReleaseWriteLock(&dbase->cache_lock);
+    }
     return 0;
 
   error:
-    ObtainWriteLock(&dbase->cache_lock);
+    if (!cachelocked) {
+       ObtainWriteLock(&dbase->cache_lock);
+    }
     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
     ReleaseWriteLock(&dbase->cache_lock);
     return code;
index 939501f..17622be 100644 (file)
@@ -223,6 +223,22 @@ extern int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
 extern void *ubik_CheckRXSecurityRock;
 /*\}*/
 
+/*
+ * For applications that make use of ubik_BeginTransReadAnyWrite, writing
+ * processes must not update the application-level cache as they write,
+ * or else readers can read the new cache before the data is committed to
+ * the db. So, when a commit occurs, the cache must be updated right then.
+ * If set, this function will be called during commits of write transactions,
+ * to update the application-level cache after a write. This will be called
+ * immediately after the local disk commit succeeds, and it will be called
+ * with a lock held that prevents other threads from reading from the cache
+ * or the db in general.
+ *
+ * Note that this function MUST be set in order to make use of
+ * ubik_BeginTransReadAnyWrite.
+ */
+extern int (*ubik_SyncWriterCacheProc) (void);
+
 /****************INTERNALS BELOW ****************/
 
 #ifdef UBIK_INTERNALS
@@ -251,6 +267,8 @@ extern void *ubik_CheckRXSecurityRock;
 #define TRCACHELOCKED       32  /*!< this trans has locked dbase->cache_lock
                                  *   (meaning, this trans has called
                                  *   ubik_CheckCache at some point */
+#define TRREADWRITE         64  /*!< read even if there's a conflicting ubik-
+                                 *   level write lock */
 /*\}*/
 
 /*! \name ubik_lock flags */
@@ -493,6 +511,9 @@ extern int ubik_BeginTrans(struct ubik_dbase *dbase,
 extern int ubik_BeginTransReadAny(struct ubik_dbase *dbase,
                                  afs_int32 transMode,
                                  struct ubik_trans **transPtr);
+extern int ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase,
+                                       afs_int32 transMode,
+                                       struct ubik_trans **transPtr);
 extern int ubik_AbortTrans(struct ubik_trans *transPtr);
 
 extern int ubik_EndTrans(struct ubik_trans *transPtr);