windows-buf-scache-interlock-20080222
[openafs.git] / src / WINNT / afsd / cm_buf.c
index 3396139..ccc5098 100644 (file)
@@ -90,30 +90,51 @@ static int buf_ShutdownFlag = 0;
 void buf_HoldLocked(cm_buf_t *bp)
 {
     osi_assertx(bp->magic == CM_BUF_MAGIC,"incorrect cm_buf_t magic");
-    bp->refCount++;
+    InterlockedIncrement(&bp->refCount);
 }
 
 /* hold a reference to an already held buffer */
 void buf_Hold(cm_buf_t *bp)
 {
-    lock_ObtainWrite(&buf_globalLock);
+    lock_ObtainRead(&buf_globalLock);
     buf_HoldLocked(bp);
-    lock_ReleaseWrite(&buf_globalLock);
+    lock_ReleaseRead(&buf_globalLock);
 }
 
 /* code to drop reference count while holding buf_globalLock */
-void buf_ReleaseLocked(cm_buf_t *bp)
+void buf_ReleaseLocked(cm_buf_t *bp, afs_uint32 writeLocked)
 {
+    afs_int32 refCount;
+
+    if (writeLocked)
+        lock_AssertWrite(&buf_globalLock);
+    else
+        lock_AssertRead(&buf_globalLock);
+
     /* ensure that we're in the LRU queue if our ref count is 0 */
     osi_assertx(bp->magic == CM_BUF_MAGIC,"incorrect cm_buf_t magic");
+
+    refCount = InterlockedDecrement(&bp->refCount);
 #ifdef DEBUG
-    if (bp->refCount == 0)
+    if (refCount < 0)
        osi_panic("buf refcount 0",__FILE__,__LINE__);;
 #else
-    osi_assertx(bp->refCount > 0, "cm_buf_t refCount == 0");
+    osi_assertx(refCount >= 0, "cm_buf_t refCount == 0");
 #endif
-    if (--bp->refCount == 0) {
-        if (!(bp->flags & CM_BUF_INLRU)) {
+    if (refCount == 0) {
+        /* 
+         * If we are read locked there could be a race condition
+         * with buf_Find() so we must obtain a write lock and
+         * double check that the refCount is actually zero
+         * before we remove the buffer from the LRU queue.
+         */
+        if (!writeLocked) {
+            lock_ReleaseRead(&buf_globalLock);
+            lock_ObtainWrite(&buf_globalLock);
+        }
+
+        if (bp->refCount == 0 &&
+            !(bp->flags & CM_BUF_INLRU)) {
             osi_QAdd((osi_queue_t **) &cm_data.buf_freeListp, &bp->q);
 
             /* watch for transition from empty to one element */
@@ -121,15 +142,42 @@ void buf_ReleaseLocked(cm_buf_t *bp)
                 cm_data.buf_freeListEndp = cm_data.buf_freeListp;
             bp->flags |= CM_BUF_INLRU;
         }
+
+        if (!writeLocked) {
+            lock_ReleaseWrite(&buf_globalLock);
+            lock_ObtainRead(&buf_globalLock);
+        }
     }
 }       
 
 /* release a buffer.  Buffer must be referenced, but unlocked. */
 void buf_Release(cm_buf_t *bp)
 {
-    lock_ObtainWrite(&buf_globalLock);
-    buf_ReleaseLocked(bp);
-    lock_ReleaseWrite(&buf_globalLock);
+    afs_int32 refCount;
+
+    /* ensure that we're in the LRU queue if our ref count is 0 */
+    osi_assertx(bp->magic == CM_BUF_MAGIC,"incorrect cm_buf_t magic");
+
+    refCount = InterlockedDecrement(&bp->refCount);
+#ifdef DEBUG
+    if (refCount < 0)
+       osi_panic("buf refcount 0",__FILE__,__LINE__);;
+#else
+    osi_assertx(refCount >= 0, "cm_buf_t refCount == 0");
+#endif
+    if (refCount == 0) {
+        lock_ObtainWrite(&buf_globalLock);
+        if (bp->refCount == 0 && 
+            !(bp->flags & CM_BUF_INLRU)) {
+            osi_QAdd((osi_queue_t **) &cm_data.buf_freeListp, &bp->q);
+
+            /* watch for transition from empty to one element */
+            if (!cm_data.buf_freeListEndp)
+                cm_data.buf_freeListEndp = cm_data.buf_freeListp;
+            bp->flags |= CM_BUF_INLRU;
+        }
+        lock_ReleaseWrite(&buf_globalLock);
+    }
 }
 
 /* incremental sync daemon.  Writes all dirty buffers every 5000 ms */
@@ -178,7 +226,7 @@ void buf_IncrSyncer(long parm)
                    bp->dirtyp = NULL;
                    if (cm_data.buf_dirtyListp == NULL)
                        cm_data.buf_dirtyListEndp = NULL;
-                   buf_ReleaseLocked(bp);
+                   buf_ReleaseLocked(bp, TRUE);
                    lock_ReleaseWrite(&buf_globalLock);
                } else {
                    /* advance the pointer so we don't loop forever */
@@ -528,9 +576,9 @@ cm_buf_t *buf_Find(struct cm_scache *scp, osi_hyper_t *offsetp)
 {
     cm_buf_t *bp;
 
-    lock_ObtainWrite(&buf_globalLock);
+    lock_ObtainRead(&buf_globalLock);
     bp = buf_FindLocked(scp, offsetp);
-    lock_ReleaseWrite(&buf_globalLock);
+    lock_ReleaseRead(&buf_globalLock);
 
     return bp;
 }       
@@ -626,7 +674,7 @@ long buf_CleanAsyncLocked(cm_buf_t *bp, cm_req_t *reqp)
  */
 void buf_Recycle(cm_buf_t *bp)
 {
-    int i;
+    afs_uint32 i;
     cm_buf_t **lbpp;
     cm_buf_t *tbp;
     cm_buf_t *prevBp, *nextBp;
@@ -1138,7 +1186,6 @@ void buf_SetDirty(cm_buf_t *bp, afs_uint32 offset, afs_uint32 length)
     osi_assertx(bp->magic == CM_BUF_MAGIC, "invalid cm_buf_t magic");
     osi_assertx(bp->refCount > 0, "cm_buf_t refcount 0");
 
-    lock_ObtainWrite(&buf_globalLock);
     if (bp->flags & CM_BUF_DIRTY) {
 
        osi_Log1(buf_logp, "buf_SetDirty 0x%p already dirty", bp);
@@ -1179,6 +1226,7 @@ void buf_SetDirty(cm_buf_t *bp, afs_uint32 offset, afs_uint32 length)
          * elsewhere, never add to the dirty list if the buffer is 
          * already there.
          */
+        lock_ObtainWrite(&buf_globalLock);
         if (bp->dirtyp == NULL && cm_data.buf_dirtyListEndp != bp) {
             buf_HoldLocked(bp);
             if (!cm_data.buf_dirtyListp) {
@@ -1189,8 +1237,8 @@ void buf_SetDirty(cm_buf_t *bp, afs_uint32 offset, afs_uint32 length)
             }
             bp->dirtyp = NULL;
         }
+        lock_ReleaseWrite(&buf_globalLock);
     }
-    lock_ReleaseWrite(&buf_globalLock);
 }
 
 /* clean all buffers, reset log pointers and invalidate all buffers.
@@ -1219,12 +1267,12 @@ long buf_CleanAndReset(void)
     cm_buf_t *bp;
     cm_req_t req;
 
-    lock_ObtainWrite(&buf_globalLock);
+    lock_ObtainRead(&buf_globalLock);
     for(i=0; i<cm_data.buf_hashSize; i++) {
         for(bp = cm_data.buf_scacheHashTablepp[i]; bp; bp = bp->hashp) {
             if ((bp->flags & CM_BUF_DIRTY) == CM_BUF_DIRTY) {
                 buf_HoldLocked(bp);
-                lock_ReleaseWrite(&buf_globalLock);
+                lock_ReleaseRead(&buf_globalLock);
 
                 /* now no locks are held; clean buffer and go on */
                 cm_InitReq(&req);
@@ -1234,14 +1282,14 @@ long buf_CleanAndReset(void)
                buf_CleanWait(NULL, bp);
 
                 /* relock and release buffer */
-                lock_ObtainWrite(&buf_globalLock);
-                buf_ReleaseLocked(bp);
+                lock_ObtainRead(&buf_globalLock);
+                buf_ReleaseLocked(bp, FALSE);
             } /* dirty */
         } /* over one bucket */
     }  /* for loop over all hash buckets */
 
     /* release locks */
-    lock_ReleaseWrite(&buf_globalLock);
+    lock_ReleaseRead(&buf_globalLock);
 
 #ifdef TESTING
     buf_ValidateBufQueues();
@@ -1315,22 +1363,22 @@ long buf_Truncate(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp,
     osi_hyper_t bufEnd;
     long code;
     long bufferPos;
-    long i;
+    afs_uint32 i;
 
     /* assert that cm_bufCreateLock is held in write mode */
     lock_AssertWrite(&scp->bufCreateLock);
 
     i = BUF_FILEHASH(&scp->fid);
 
-    lock_ObtainWrite(&buf_globalLock);
+    lock_ObtainRead(&buf_globalLock);
     bufp = cm_data.buf_fileHashTablepp[i];
     if (bufp == NULL) {
-        lock_ReleaseWrite(&buf_globalLock);
+        lock_ReleaseRead(&buf_globalLock);
         return 0;
     }
 
     buf_HoldLocked(bufp);
-    lock_ReleaseWrite(&buf_globalLock);
+    lock_ReleaseRead(&buf_globalLock);
     while (bufp) {
         lock_ObtainMutex(&bufp->mx);
 
@@ -1354,7 +1402,6 @@ long buf_Truncate(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp,
                           | CM_SCACHESYNC_BUFLOCKED);
 
        
-       lock_ObtainWrite(&buf_globalLock);
        /* if we succeeded in our locking, and this applies to the right
          * file, and the truncate request overlaps the buffer either
          * totally or partially, then do something.
@@ -1402,14 +1449,13 @@ long buf_Truncate(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp,
        if (!code) {
            nbufp = bufp->fileHashp;
            if (nbufp) 
-               buf_HoldLocked(nbufp);
+               buf_Hold(nbufp);
        } else {
            /* This forces the loop to end and the error code
             * to be returned. */
            nbufp = NULL;
        }
-       buf_ReleaseLocked(bufp);
-       lock_ReleaseWrite(&buf_globalLock);
+       buf_Release(bufp);
        bufp = nbufp;
     }
 
@@ -1427,16 +1473,16 @@ long buf_FlushCleanPages(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
     cm_buf_t *bp;              /* buffer we're hacking on */
     cm_buf_t *nbp;
     int didRelease;
-    long i;
+    afs_uint32 i;
 
     i = BUF_FILEHASH(&scp->fid);
 
     code = 0;
-    lock_ObtainWrite(&buf_globalLock);
+    lock_ObtainRead(&buf_globalLock);
     bp = cm_data.buf_fileHashTablepp[i];
     if (bp) 
         buf_HoldLocked(bp);
-    lock_ReleaseWrite(&buf_globalLock);
+    lock_ReleaseRead(&buf_globalLock);
     
     for (; bp; bp = nbp) {
         didRelease = 0;        /* haven't released this buffer yet */
@@ -1468,21 +1514,21 @@ long buf_FlushCleanPages(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
             bp->dirtyCounter++;
            lock_ReleaseMutex(&bp->mx);
 
-            lock_ObtainWrite(&buf_globalLock);
             /* actually, we only know that buffer is clean if ref
              * count is 1, since we don't have buffer itself locked.
              */
             if (!(bp->flags & CM_BUF_DIRTY)) {
+                lock_ObtainWrite(&buf_globalLock);
                 if (bp->refCount == 1) {       /* bp is held above */
                     nbp = bp->fileHashp;
                     if (nbp) 
                         buf_HoldLocked(nbp);
-                    buf_ReleaseLocked(bp);
+                    buf_ReleaseLocked(bp, TRUE);
                     didRelease = 1;
                     buf_Recycle(bp);
                 }
+                lock_ReleaseWrite(&buf_globalLock);
             }
-            lock_ReleaseWrite(&buf_globalLock);
 
            if (code != CM_ERROR_BADFD)
                (*cm_buf_opsp->Unstabilizep)(scp, userp);
@@ -1490,12 +1536,12 @@ long buf_FlushCleanPages(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
 
       skip:
         if (!didRelease) {
-            lock_ObtainWrite(&buf_globalLock);
+            lock_ObtainRead(&buf_globalLock);
             nbp = bp->fileHashp;
            if (nbp)
                 buf_HoldLocked(nbp);
-            buf_ReleaseLocked(bp);
-            lock_ReleaseWrite(&buf_globalLock);
+            buf_ReleaseLocked(bp, FALSE);
+            lock_ReleaseRead(&buf_globalLock);
         }
     }  /* for loop over a bunch of buffers */
 
@@ -1546,11 +1592,11 @@ long buf_CleanVnode(struct cm_scache *scp, cm_user_t *userp, cm_req_t *reqp)
 
     i = BUF_FILEHASH(&scp->fid);
 
-    lock_ObtainWrite(&buf_globalLock);
+    lock_ObtainRead(&buf_globalLock);
     bp = cm_data.buf_fileHashTablepp[i];
     if (bp) 
         buf_HoldLocked(bp);
-    lock_ReleaseWrite(&buf_globalLock);
+    lock_ReleaseRead(&buf_globalLock);
     for (; bp; bp = nbp) {
         /* clean buffer synchronously */
         if (cm_FidCmp(&bp->fid, &scp->fid) == 0) {
@@ -1573,12 +1619,12 @@ long buf_CleanVnode(struct cm_scache *scp, cm_user_t *userp, cm_req_t *reqp)
             lock_ReleaseMutex(&bp->mx);
         }
 
-        lock_ObtainWrite(&buf_globalLock);
+        lock_ObtainRead(&buf_globalLock);
         nbp = bp->fileHashp;
         if (nbp) 
             buf_HoldLocked(nbp);
-        buf_ReleaseLocked(bp);
-        lock_ReleaseWrite(&buf_globalLock);
+        buf_ReleaseLocked(bp, FALSE);
+        lock_ReleaseRead(&buf_globalLock);
     }  /* for loop over a bunch of buffers */
 
 #ifdef TESTING