Windows: cm_RemoveSCacheFromHashTable scp not found
[openafs.git] / src / WINNT / afsd / cm_scache.c
index 687b0cd..a2432b5 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "afsd.h"
 #include "cm_btree.h"
+#include <afs/unified_afs.h>
 
 /*extern void afsi_log(char *pattern, ...);*/
 
@@ -65,37 +66,67 @@ cm_RootSCachep(cm_user_t *userp, cm_req_t *reqp)
 void cm_AdjustScacheLRU(cm_scache_t *scp)
 {
     lock_AssertWrite(&cm_scacheLock);
-    osi_QRemoveHT((osi_queue_t **) &cm_data.scacheLRUFirstp, (osi_queue_t **) &cm_data.scacheLRULastp, &scp->q);
-    if (scp->flags & CM_SCACHEFLAG_DELETED) {
-        /* Since it has been deleted make it the first to be recycled. */
-        osi_QAddT((osi_queue_t **) &cm_data.scacheLRUFirstp, (osi_queue_t **) &cm_data.scacheLRULastp, &scp->q);
-    } else {
+    if (!(scp->flags & CM_SCACHEFLAG_DELETED)) {
+        osi_QRemoveHT((osi_queue_t **) &cm_data.scacheLRUFirstp, (osi_queue_t **) &cm_data.scacheLRULastp, &scp->q);
         osi_QAddH((osi_queue_t **) &cm_data.scacheLRUFirstp, (osi_queue_t **) &cm_data.scacheLRULastp, &scp->q);
     }
 }
 
-/* call with cm_scacheLock write-locked and scp rw held */
-void cm_RemoveSCacheFromHashTable(cm_scache_t *scp)
+static int
+cm_RemoveSCacheFromHashChain(cm_scache_t *scp, int index)
 {
     cm_scache_t **lscpp;
     cm_scache_t *tscp;
-    int i;
+    int found = 0;
+
+    for (lscpp = &cm_data.scacheHashTablep[index], tscp = cm_data.scacheHashTablep[index];
+         tscp;
+         lscpp = &tscp->nextp, tscp = tscp->nextp) {
+       if (tscp == scp) {
+           *lscpp = scp->nextp;
+           scp->nextp = NULL;
+           found = 1;
+           break;
+       }
+    }
+
+    return found;
+}
 
+/* call with cm_scacheLock write-locked and scp rw held */
+void cm_RemoveSCacheFromHashTable(cm_scache_t *scp)
+{
     lock_AssertWrite(&cm_scacheLock);
     lock_AssertWrite(&scp->rw);
     if (scp->flags & CM_SCACHEFLAG_INHASH) {
+       int h,i;
+       int found = 0;
+
        /* hash it out first */
-       i = CM_SCACHE_HASH(&scp->fid);
-       for (lscpp = &cm_data.scacheHashTablep[i], tscp = cm_data.scacheHashTablep[i];
-            tscp;
-            lscpp = &tscp->nextp, tscp = tscp->nextp) {
-           if (tscp == scp) {
-               *lscpp = scp->nextp;
-                scp->nextp = NULL;
-               _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_INHASH);
-               break;
+       h = CM_SCACHE_HASH(&scp->fid);
+       found = cm_RemoveSCacheFromHashChain(scp, h);
+
+       if (!found) {
+           /*
+            * The CM_SCACHEFLAG_INHASH is set on the cm_scache_t but
+            * we didn't find the entry in the expected hash chain.
+            * Did the fid change?
+            * In any case, we will search the entire hashtable for
+            * the object.  If we don't find it, then we know it is
+            * safe to remove the flag.
+            */
+           for (i=0; !found && i<cm_data.scacheHashTableSize; i++) {
+               if (i != h)
+                   found = cm_RemoveSCacheFromHashChain(scp, i);
            }
+
+           if (found)
+               osi_Log1(afsd_logp,"cm_RemoveSCacheFromHashTable scp 0x%p found in wrong hash chain", scp);
+           else
+               osi_Log1(afsd_logp,"cm_RemoveSCacheFromHashTable scp 0x%p not found in hash table", scp);
        }
+
+       _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_INHASH);
     }
 }
 
@@ -139,6 +170,10 @@ void cm_ResetSCacheDirectory(cm_scache_t *scp, afs_int32 dirlock)
 /* called with cm_scacheLock and scp write-locked; recycles an existing scp. */
 long cm_RecycleSCache(cm_scache_t *scp, afs_int32 flags)
 {
+    cm_fid_t fid;
+    afs_uint32 fileType;
+    int callback;
+
     lock_AssertWrite(&cm_scacheLock);
     lock_AssertWrite(&scp->rw);
 
@@ -154,18 +189,38 @@ long cm_RecycleSCache(cm_scache_t *scp, afs_int32 flags)
        return -1;
     }
 
+    if (scp->redirBufCount != 0) {
+        return -1;
+    }
+
+    fid = scp->fid;
+    fileType = scp->fileType;
+    callback = scp->cbExpires ? 1 : 0;
+
+    cm_RemoveSCacheFromHashTable(scp);
+
+    if (scp->fileType == CM_SCACHETYPE_DIRECTORY &&
+         !cm_accessPerFileCheck) {
+        cm_volume_t *volp = cm_GetVolumeByFID(&scp->fid);
+
+        if (volp) {
+            if (!(volp->flags & CM_VOLUMEFLAG_DFS_VOLUME))
+                cm_EAccesClearParentEntries(&fid);
+
+            cm_PutVolume(volp);
+        }
+    }
 
     /* invalidate so next merge works fine;
      * also initialize some flags */
     scp->fileType = 0;
     _InterlockedAnd(&scp->flags,
-                    ~(CM_SCACHEFLAG_STATD
-                    | CM_SCACHEFLAG_DELETED
+                    ~( CM_SCACHEFLAG_DELETED
                     | CM_SCACHEFLAG_RO
                     | CM_SCACHEFLAG_PURERO
                     | CM_SCACHEFLAG_OVERQUOTA
                     | CM_SCACHEFLAG_OUTOFSPACE
-                    | CM_SCACHEFLAG_EACCESS));
+                     | CM_SCACHEFLAG_ASYNCSTORING));
     scp->serverModTime = 0;
     scp->dataVersion = CM_SCACHE_VERSION_BAD;
     scp->bufDataVersionLow = CM_SCACHE_VERSION_BAD;
@@ -178,6 +233,7 @@ long cm_RecycleSCache(cm_scache_t *scp, afs_int32 flags)
         scp->cbServerp = NULL;
     }
     scp->cbExpires = 0;
+    scp->cbIssued = 0;
     scp->volumeCreationDate = 0;
 
     scp->fid.vnode = 0;
@@ -195,6 +251,7 @@ long cm_RecycleSCache(cm_scache_t *scp, afs_int32 flags)
     scp->mask = 0;
 
     /* discard symlink info */
+    scp->mpDataVersion = CM_SCACHE_VERSION_BAD;
     scp->mountPointStringp[0] = '\0';
     memset(&scp->mountRootFid, 0, sizeof(cm_fid_t));
     memset(&scp->dotdotFid, 0, sizeof(cm_fid_t));
@@ -214,6 +271,23 @@ long cm_RecycleSCache(cm_scache_t *scp, afs_int32 flags)
     cm_FreeAllACLEnts(scp);
 
     cm_ResetSCacheDirectory(scp, 0);
+
+    if (RDR_Initialized && callback) {
+        /*
+        * We drop the cm_scacheLock because it may be required to
+        * satisfy an ioctl request from the redirector.  It should
+        * be safe to hold the scp->rw lock here because at this
+        * point (a) the object has just been recycled so the fid
+        * is nul and there are no requests that could possibly
+        * be issued by the redirector that would depend upon it.
+        */
+        lock_ReleaseWrite(&cm_scacheLock);
+        RDR_InvalidateObject( fid.cell, fid.volume, fid.vnode,
+                              fid.unique, fid.hash,
+                              fileType, AFS_INVALIDATE_EXPIRED);
+        lock_ObtainWrite(&cm_scacheLock);
+    }
+
     return 0;
 }
 
@@ -223,53 +297,131 @@ long cm_RecycleSCache(cm_scache_t *scp, afs_int32 flags)
  * Can allocate a new one if desperate, or if below quota (cm_data.maxSCaches).
  * returns scp->rw write-locked.
  */
-cm_scache_t *cm_GetNewSCache(void)
+cm_scache_t *
+cm_GetNewSCache(afs_uint32 locked)
 {
-    cm_scache_t *scp;
-    int retry = 0;
+    cm_scache_t *scp = NULL;
+    cm_scache_t *scp_prev = NULL;
+    cm_scache_t *scp_next = NULL;
+    int attempt = 0;
+
+    if (locked)
+        lock_AssertWrite(&cm_scacheLock);
+    else
+        lock_ObtainWrite(&cm_scacheLock);
 
-    lock_AssertWrite(&cm_scacheLock);
     if (cm_data.currentSCaches >= cm_data.maxSCaches) {
        /* There were no deleted scache objects that we could use.  Try to find
         * one that simply hasn't been used in a while.
         */
-        for ( scp = cm_data.scacheLRULastp;
-              scp;
-              scp = (cm_scache_t *) osi_QPrev(&scp->q))
-        {
-            /* It is possible for the refCount to be zero and for there still
-             * to be outstanding dirty buffers.  If there are dirty buffers,
-             * we must not recycle the scp. */
-            if (scp->refCount == 0 && scp->bufReadsp == NULL && scp->bufWritesp == NULL) {
-                if (!buf_DirtyBuffersExist(&scp->fid)) {
-                    if (!lock_TryWrite(&scp->rw))
-                        continue;
-
-                    if (!cm_RecycleSCache(scp, 0)) {
-                        /* we found an entry, so return it */
-                        /* now remove from the LRU queue and put it back at the
-                         * head of the LRU queue.
-                         */
-                        cm_AdjustScacheLRU(scp);
-
-                        /* and we're done */
-                        return scp;
+        for (attempt = 0 ; attempt < 128; attempt++) {
+            afs_uint32 count = 0;
+
+            for ( scp = cm_data.scacheLRULastp;
+                  scp;
+                  scp = (cm_scache_t *) osi_QPrev(&scp->q))
+            {
+                /*
+                 * We save the prev and next pointers in the
+                 * LRU because we are going to drop the cm_scacheLock and
+                 * the order of the list could change out from beneath us.
+                 * If both changed, it means that this entry has been moved
+                 * within the LRU and it should no longer be recycled.
+                 */
+                scp_prev = (cm_scache_t *) osi_QPrev(&scp->q);
+                scp_next = (cm_scache_t *) osi_QNext(&scp->q);
+                count++;
+
+                /* It is possible for the refCount to be zero and for there still
+                 * to be outstanding dirty buffers.  If there are dirty buffers,
+                 * we must not recycle the scp.
+                 *
+                 * If the object is in use by the redirector, then avoid recycling
+                 * it unless we have to.
+                 */
+                if (scp->refCount == 0 && scp->bufReadsp == NULL && scp->bufWritesp == NULL) {
+                    afs_uint32 buf_dirty = 0;
+                    afs_uint32 buf_rdr = 0;
+
+                    lock_ReleaseWrite(&cm_scacheLock);
+                    buf_dirty = buf_DirtyBuffersExist(&scp->fid);
+                    if (!buf_dirty)
+                        buf_rdr = buf_RDRBuffersExist(&scp->fid);
+
+                    if (!buf_dirty && !buf_rdr) {
+                        cm_fid_t   fid;
+                        afs_uint32 fileType;
+                        int        success;
+
+                        success = lock_TryWrite(&scp->rw);
+
+                        lock_ObtainWrite(&cm_scacheLock);
+                        if (scp_prev != (cm_scache_t *) osi_QPrev(&scp->q) &&
+                            scp_next != (cm_scache_t *) osi_QNext(&scp->q))
+                        {
+                            osi_Log1(afsd_logp, "GetNewSCache scp 0x%p; LRU order changed", scp);
+                            if (success)
+                                lock_ReleaseWrite(&scp->rw);
+                            break;
+                        } else if (!success) {
+                                osi_Log1(afsd_logp, "GetNewSCache failed to obtain lock scp 0x%p", scp);
+                                continue;
+                        }
+
+                        /* Found a likely candidate.  Save type and fid in case we succeed */
+                        fid = scp->fid;
+                        fileType = scp->fileType;
+
+                        if (!cm_RecycleSCache(scp, 0)) {
+                            /* we found an entry, so return it.
+                             * remove from the LRU queue and put it back at the
+                             * head of the LRU queue.
+                             */
+                            cm_AdjustScacheLRU(scp);
+
+                            /* and we're done - SUCCESS */
+                            osi_assertx(!(scp->flags & CM_SCACHEFLAG_INHASH), "CM_SCACHEFLAG_INHASH set");
+                            goto done;
+                        }
+                        lock_ReleaseWrite(&scp->rw);
+                    } else {
+                        if (buf_rdr)
+                            osi_Log1(afsd_logp,"GetNewSCache redirector is holding extents scp 0x%p", scp);
+                        else
+                            osi_Log1(afsd_logp, "GetNewSCache dirty buffers scp 0x%p", scp);
+
+                        lock_ObtainWrite(&cm_scacheLock);
+                        if (scp_prev != (cm_scache_t *) osi_QPrev(&scp->q) &&
+                            scp_next != (cm_scache_t *) osi_QNext(&scp->q))
+                        {
+                            osi_Log1(afsd_logp, "GetNewSCache scp 0x%p; LRU order changed", scp);
+                            break;
+                        }
                     }
-                    lock_ReleaseWrite(&scp->rw);
-                } else {
-                    osi_Log1(afsd_logp,"GetNewSCache dirty buffers exist scp 0x%x", scp);
                 }
+            } /* for */
+
+            osi_Log2(afsd_logp, "GetNewSCache all scache entries in use (attempt = %d, count = %u)", attempt, count);
+            if (scp == NULL) {
+                /*
+                * The entire LRU queue was walked and no available cm_scache_t was
+                * found.  Drop the cm_scacheLock and sleep for a moment to give a
+                * chance for cm_scache_t objects to be released.
+                */
+                lock_ReleaseWrite(&cm_scacheLock);
+                Sleep(50);
+                lock_ObtainWrite(&cm_scacheLock);
             }
         }
-        osi_Log1(afsd_logp, "GetNewSCache all scache entries in use (retry = %d)", retry);
-
-        return NULL;
+        /* FAILURE */
+        scp = NULL;
+        goto done;
     }
 
     /* if we get here, we should allocate a new scache entry.  We either are below
      * quota or we have a leak and need to allocate a new one to avoid panicing.
      */
-    scp = cm_data.scacheBaseAddress + cm_data.currentSCaches;
+    scp = cm_data.scacheBaseAddress + InterlockedIncrement(&cm_data.currentSCaches) - 1;
     osi_assertx(scp >= cm_data.scacheBaseAddress && scp < (cm_scache_t *)cm_data.scacheHashTablep,
                 "invalid cm_scache_t address");
     memset(scp, 0, sizeof(cm_scache_t));
@@ -280,17 +432,24 @@ cm_scache_t *cm_GetNewSCache(void)
 #ifdef USE_BPLUS
     lock_InitializeRWLock(&scp->dirlock, "cm_scache_t dirlock", LOCK_HIERARCHY_SCACHE_DIRLOCK);
 #endif
+    lock_InitializeMutex(&scp->redirMx, "cm_scache_t redirMx", LOCK_HIERARCHY_SCACHE_REDIRMX);
     scp->serverLock = -1;
+    scp->dataVersion = CM_SCACHE_VERSION_BAD;
+    scp->bufDataVersionLow = CM_SCACHE_VERSION_BAD;
+    scp->lockDataVersion = CM_SCACHE_VERSION_BAD;
+    scp->mpDataVersion = CM_SCACHE_VERSION_BAD;
 
     /* and put it in the LRU queue */
-    osi_QAdd((osi_queue_t **) &cm_data.scacheLRUFirstp, &scp->q);
-    if (!cm_data.scacheLRULastp)
-        cm_data.scacheLRULastp = scp;
-    cm_data.currentSCaches++;
+    osi_QAddH((osi_queue_t **) &cm_data.scacheLRUFirstp, (osi_queue_t **)&cm_data.scacheLRULastp, &scp->q);
     cm_dnlcPurgedp(scp); /* make doubly sure that this is not in dnlc */
     cm_dnlcPurgevp(scp);
     scp->allNextp = cm_data.allSCachesp;
     cm_data.allSCachesp = scp;
+
+  done:
+    if (!locked)
+        lock_ReleaseWrite(&cm_scacheLock);
+
     return scp;
 }
 
@@ -300,7 +459,7 @@ void cm_SetFid(cm_fid_t *fidp, afs_uint32 cell, afs_uint32 volume, afs_uint32 vn
     fidp->volume = volume;
     fidp->vnode = vnode;
     fidp->unique = unique;
-    fidp->hash = ((cell & 0xF) << 28) | ((volume & 0x3F) << 22) | ((vnode & 0x7FF) << 11) | (unique & 0x7FF);
+    CM_FID_GEN_HASH(fidp);
 }
 
 /* like strcmp, only for fids */
@@ -326,6 +485,7 @@ void cm_fakeSCacheInit(int newFile)
         cm_data.fakeSCache.magic = CM_SCACHE_MAGIC;
         cm_data.fakeSCache.cbServerp = (struct cm_server *)(-1);
         cm_data.fakeSCache.cbExpires = (time_t)-1;
+        cm_data.fakeSCache.cbExpires = time(NULL);
         /* can leave clientModTime at 0 */
         cm_data.fakeSCache.fileType = CM_SCACHETYPE_FILE;
         cm_data.fakeSCache.unixModeBits = 0777;
@@ -338,6 +498,7 @@ void cm_fakeSCacheInit(int newFile)
     lock_InitializeRWLock(&cm_data.fakeSCache.rw, "cm_scache_t rw", LOCK_HIERARCHY_SCACHE);
     lock_InitializeRWLock(&cm_data.fakeSCache.bufCreateLock, "cm_scache_t bufCreateLock", LOCK_HIERARCHY_SCACHE_BUFCREATE);
     lock_InitializeRWLock(&cm_data.fakeSCache.dirlock, "cm_scache_t dirlock", LOCK_HIERARCHY_SCACHE_DIRLOCK);
+    lock_InitializeMutex(&cm_data.fakeSCache.redirMx, "cm_scache_t redirMx", LOCK_HIERARCHY_SCACHE_REDIRMX);
 }
 
 long
@@ -356,16 +517,40 @@ cm_ValidateSCache(void)
     for ( scp = cm_data.scacheLRUFirstp, lscp = NULL, i = 0;
           scp;
           lscp = scp, scp = (cm_scache_t *) osi_QNext(&scp->q), i++ ) {
+
+       if ( scp < (cm_scache_t *)cm_data.scacheBaseAddress ||
+            scp >= (cm_scache_t *)cm_data.dnlcBaseAddress) {
+           afsi_log("cm_ValidateSCache failure: out of range cm_scache_t pointers");
+           fprintf(stderr, "cm_ValidateSCache failure: out of range cm_scache_t pointers\n");
+           return -18;
+       }
+
         if (scp->magic != CM_SCACHE_MAGIC) {
             afsi_log("cm_ValidateSCache failure: scp->magic != CM_SCACHE_MAGIC");
             fprintf(stderr, "cm_ValidateSCache failure: scp->magic != CM_SCACHE_MAGIC\n");
             return -1;
         }
+
+       if ( scp->nextp < (cm_scache_t *)cm_data.scacheBaseAddress ||
+            scp->nextp >= (cm_scache_t *)cm_data.dnlcBaseAddress) {
+           afsi_log("cm_ValidateSCache failure: out of range cm_scache_t pointers");
+           fprintf(stderr, "cm_ValidateSCache failure: out of range cm_scache_t pointers\n");
+           return -21;
+       }
+
         if (scp->nextp && scp->nextp->magic != CM_SCACHE_MAGIC) {
             afsi_log("cm_ValidateSCache failure: scp->nextp->magic != CM_SCACHE_MAGIC");
             fprintf(stderr, "cm_ValidateSCache failure: scp->nextp->magic != CM_SCACHE_MAGIC\n");
             return -2;
         }
+
+       if ( scp->randomACLp < (cm_aclent_t *)cm_data.aclBaseAddress ||
+            scp->randomACLp >= (cm_aclent_t *)cm_data.scacheBaseAddress) {
+           afsi_log("cm_ValidateSCache failure: out of range cm_aclent_t pointers");
+           fprintf(stderr, "cm_ValidateSCache failure: out of range cm_aclent_t pointers\n");
+           return -32;
+       }
+
         if (scp->randomACLp && scp->randomACLp->magic != CM_ACLENT_MAGIC) {
             afsi_log("cm_ValidateSCache failure: scp->randomACLp->magic != CM_ACLENT_MAGIC");
             fprintf(stderr, "cm_ValidateSCache failure: scp->randomACLp->magic != CM_ACLENT_MAGIC\n");
@@ -385,16 +570,40 @@ cm_ValidateSCache(void)
 
     for ( scp = cm_data.scacheLRULastp, lscp = NULL, i = 0; scp;
           lscp = scp, scp = (cm_scache_t *) osi_QPrev(&scp->q), i++ ) {
+
+       if ( scp < (cm_scache_t *)cm_data.scacheBaseAddress ||
+            scp >= (cm_scache_t *)cm_data.dnlcBaseAddress) {
+           afsi_log("cm_ValidateSCache failure: out of range cm_scache_t pointers");
+           fprintf(stderr, "cm_ValidateSCache failure: out of range cm_scache_t pointers\n");
+           return -19;
+       }
+
         if (scp->magic != CM_SCACHE_MAGIC) {
             afsi_log("cm_ValidateSCache failure: scp->magic != CM_SCACHE_MAGIC");
             fprintf(stderr, "cm_ValidateSCache failure: scp->magic != CM_SCACHE_MAGIC\n");
             return -5;
         }
+
+       if ( scp->nextp < (cm_scache_t *)cm_data.scacheBaseAddress ||
+            scp->nextp >= (cm_scache_t *)cm_data.dnlcBaseAddress) {
+           afsi_log("cm_ValidateSCache failure: out of range cm_scache_t pointers");
+           fprintf(stderr, "cm_ValidateSCache failure: out of range cm_scache_t pointers\n");
+           return -22;
+       }
+
         if (scp->nextp && scp->nextp->magic != CM_SCACHE_MAGIC) {
             afsi_log("cm_ValidateSCache failure: scp->nextp->magic != CM_SCACHE_MAGIC");
             fprintf(stderr, "cm_ValidateSCache failure: scp->nextp->magic != CM_SCACHE_MAGIC\n");
             return -6;
         }
+
+       if ( scp->randomACLp < (cm_aclent_t *)cm_data.aclBaseAddress ||
+            scp->randomACLp >= (cm_aclent_t *)cm_data.scacheBaseAddress) {
+           afsi_log("cm_ValidateSCache failure: out of range cm_aclent_t pointers");
+           fprintf(stderr, "cm_ValidateSCache failure: out of range cm_aclent_t pointers\n");
+           return -31;
+       }
+
         if (scp->randomACLp && scp->randomACLp->magic != CM_ACLENT_MAGIC) {
             afsi_log("cm_ValidateSCache failure: scp->randomACLp->magic != CM_ACLENT_MAGIC");
             fprintf(stderr, "cm_ValidateSCache failure: scp->randomACLp->magic != CM_ACLENT_MAGIC\n");
@@ -415,17 +624,42 @@ cm_ValidateSCache(void)
     for ( i=0; i < cm_data.scacheHashTableSize; i++ ) {
         for ( scp = cm_data.scacheHashTablep[i]; scp; scp = scp->nextp ) {
             afs_uint32 hash;
+
+           if ( scp < (cm_scache_t *)cm_data.scacheBaseAddress ||
+                scp >= (cm_scache_t *)cm_data.dnlcBaseAddress) {
+               afsi_log("cm_ValidateSCache failure: out of range cm_scache_t pointers");
+               fprintf(stderr, "cm_ValidateSCache failure: out of range cm_scache_t pointers\n");
+               return -20;
+           }
+
             hash = CM_SCACHE_HASH(&scp->fid);
+
             if (scp->magic != CM_SCACHE_MAGIC) {
                 afsi_log("cm_ValidateSCache failure: scp->magic != CM_SCACHE_MAGIC");
                 fprintf(stderr, "cm_ValidateSCache failure: scp->magic != CM_SCACHE_MAGIC\n");
                 return -9;
             }
+
+           if ( scp->nextp < (cm_scache_t *)cm_data.scacheBaseAddress ||
+                scp->nextp >= (cm_scache_t *)cm_data.dnlcBaseAddress) {
+               afsi_log("cm_ValidateSCache failure: out of range cm_scache_t pointers");
+               fprintf(stderr, "cm_ValidateSCache failure: out of range cm_scache_t pointers\n");
+               return -23;
+           }
+
             if (scp->nextp && scp->nextp->magic != CM_SCACHE_MAGIC) {
                 afsi_log("cm_ValidateSCache failure: scp->nextp->magic != CM_SCACHE_MAGIC");
                 fprintf(stderr, "cm_ValidateSCache failure: scp->nextp->magic != CM_SCACHE_MAGIC\n");
                 return -10;
             }
+
+           if ( scp->randomACLp < (cm_aclent_t *)cm_data.aclBaseAddress ||
+                scp->randomACLp >= (cm_aclent_t *)cm_data.scacheBaseAddress) {
+               afsi_log("cm_ValidateSCache failure: out of range cm_aclent_t pointers");
+               fprintf(stderr, "cm_ValidateSCache failure: out of range cm_aclent_t pointers\n");
+               return -30;
+           }
+
             if (scp->randomACLp && scp->randomACLp->magic != CM_ACLENT_MAGIC) {
                 afsi_log("cm_ValidateSCache failure: scp->randomACLp->magic != CM_ACLENT_MAGIC");
                 fprintf(stderr, "cm_ValidateSCache failure: scp->randomACLp->magic != CM_ACLENT_MAGIC\n");
@@ -507,7 +741,7 @@ cm_ShutdownSCache(void)
             scp->cbServerp = NULL;
         }
         scp->cbExpires = 0;
-        _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_CALLBACK);
+        scp->cbIssued = 0;
         lock_ReleaseWrite(&scp->rw);
 
 #ifdef USE_BPLUS
@@ -520,6 +754,7 @@ cm_ShutdownSCache(void)
 #endif
         lock_FinalizeRWLock(&scp->rw);
         lock_FinalizeRWLock(&scp->bufCreateLock);
+        lock_FinalizeMutex(&scp->redirMx);
     }
     lock_ReleaseWrite(&cm_scacheLock);
 
@@ -550,6 +785,7 @@ void cm_InitSCache(int newFile, long maxSCaches)
 #endif
                 scp->cbServerp = NULL;
                 scp->cbExpires = 0;
+                scp->cbIssued = 0;
                 scp->volumeCreationDate = 0;
                 scp->fileLocksH = NULL;
                 scp->fileLocksT = NULL;
@@ -562,12 +798,18 @@ void cm_InitSCache(int newFile, long maxSCaches)
                 scp->openShares = 0;
                 scp->openExcls = 0;
                 scp->waitCount = 0;
+                scp->activeRPCs = 0;
 #ifdef USE_BPLUS
                 scp->dirBplus = NULL;
                 scp->dirDataVersion = CM_SCACHE_VERSION_BAD;
 #endif
                 scp->waitQueueT = NULL;
-                _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_WAITING);
+                _InterlockedAnd(&scp->flags, ~(CM_SCACHEFLAG_WAITING | CM_SCACHEFLAG_RDR_IN_USE));
+
+                scp->redirBufCount = 0;
+                scp->redirQueueT = NULL;
+                scp->redirQueueH = NULL;
+                lock_InitializeMutex(&scp->redirMx, "cm_scache_t redirMx", LOCK_HIERARCHY_SCACHE_REDIRMX);
             }
         }
         cm_allFileLocks = NULL;
@@ -607,21 +849,23 @@ cm_scache_t *cm_FindSCache(cm_fid_t *fidp)
 }
 
 #ifdef DEBUG_REFCOUNT
-long cm_GetSCacheDbg(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
+long cm_GetSCacheDbg(cm_fid_t *fidp, cm_fid_t *parentFidp, cm_scache_t **outScpp, cm_user_t *userp,
                   cm_req_t *reqp, char * file, long line)
 #else
-long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
+long cm_GetSCache(cm_fid_t *fidp, cm_fid_t *parentFidp, cm_scache_t **outScpp, cm_user_t *userp,
                   cm_req_t *reqp)
 #endif
 {
     long hash;
     cm_scache_t *scp = NULL;
+    cm_scache_t *newScp = NULL;
     long code;
     cm_volume_t *volp = NULL;
     cm_cell_t *cellp;
     int special = 0; // yj: boolean variable to test if file is on root.afs
     int isRoot = 0;
     extern cm_fid_t cm_rootFid;
+    afs_int32 refCount;
 
     hash = CM_SCACHE_HASH(fidp);
 
@@ -639,7 +883,7 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
 
     // yj: check if we have the scp, if so, we don't need
     // to do anything else
-    lock_ObtainWrite(&cm_scacheLock);
+    lock_ObtainRead(&cm_scacheLock);
     for (scp=cm_data.scacheHashTablep[hash]; scp; scp=scp->nextp) {
         if (cm_FidCmp(fidp, &scp->fid) == 0) {
 #ifdef DEBUG_REFCOUNT
@@ -651,13 +895,19 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
                 cm_data.fakeDirVersion != scp->dataVersion)
                 break;
 #endif
+            if (parentFidp && scp->parentVnode == 0) {
+                scp->parentVnode = parentFidp->vnode;
+                scp->parentUnique = parentFidp->unique;
+            }
             cm_HoldSCacheNoLock(scp);
             *outScpp = scp;
+            lock_ConvertRToW(&cm_scacheLock);
             cm_AdjustScacheLRU(scp);
             lock_ReleaseWrite(&cm_scacheLock);
             return 0;
         }
     }
+    lock_ReleaseRead(&cm_scacheLock);
 
     // yj: when we get here, it means we don't have an scp
     // so we need to either load it or fake it, depending
@@ -677,7 +927,6 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
     }
 
     if (cm_freelanceEnabled && special) {
-        lock_ReleaseWrite(&cm_scacheLock);
         osi_Log0(afsd_logp,"cm_GetSCache Freelance and special");
 
         if (cm_getLocalMountPointChange()) {
@@ -685,32 +934,31 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
             cm_reInitLocalMountPoints();
         }
 
-        lock_ObtainWrite(&cm_scacheLock);
         if (scp == NULL) {
-            scp = cm_GetNewSCache();    /* returns scp->rw held */
+            scp = cm_GetNewSCache(FALSE);    /* returns scp->rw held */
             if (scp == NULL) {
                 osi_Log0(afsd_logp,"cm_GetSCache unable to obtain *new* scache entry");
-                lock_ReleaseWrite(&cm_scacheLock);
                 return CM_ERROR_WOULDBLOCK;
             }
         } else {
-            lock_ReleaseWrite(&cm_scacheLock);
             lock_ObtainWrite(&scp->rw);
-            lock_ObtainWrite(&cm_scacheLock);
         }
         scp->fid = *fidp;
-        scp->dotdotFid.cell=AFS_FAKE_ROOT_CELL_ID;
-        scp->dotdotFid.volume=AFS_FAKE_ROOT_VOL_ID;
-        scp->dotdotFid.unique=1;
-        scp->dotdotFid.vnode=1;
+        cm_SetFid(&scp->dotdotFid,AFS_FAKE_ROOT_CELL_ID,AFS_FAKE_ROOT_VOL_ID,1,1);
+        if (parentFidp) {
+            scp->parentVnode = parentFidp->vnode;
+            scp->parentUnique = parentFidp->unique;
+        }
         _InterlockedOr(&scp->flags, (CM_SCACHEFLAG_PURERO | CM_SCACHEFLAG_RO));
+        lock_ObtainWrite(&cm_scacheLock);
         if (!(scp->flags & CM_SCACHEFLAG_INHASH)) {
             scp->nextp = cm_data.scacheHashTablep[hash];
             cm_data.scacheHashTablep[hash] = scp;
             _InterlockedOr(&scp->flags, CM_SCACHEFLAG_INHASH);
         }
-        scp->refCount = 1;
-       osi_Log1(afsd_logp,"cm_GetSCache (freelance) sets refCount to 1 scp 0x%p", scp);
+        refCount = InterlockedIncrement(&scp->refCount);
+       osi_Log2(afsd_logp,"cm_GetSCache (freelance) sets refCount to 1 scp 0x%p refCount %d", scp, refCount);
+        lock_ReleaseWrite(&cm_scacheLock);
 
         /* must be called after the scp->fid is set */
         cm_FreelanceFetchMountPointString(scp);
@@ -730,7 +978,6 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
         scp->lockDataVersion=CM_SCACHE_VERSION_BAD; /* no lock yet */
         scp->fsLockCount=0;
         lock_ReleaseWrite(&scp->rw);
-        lock_ReleaseWrite(&cm_scacheLock);
        *outScpp = scp;
 #ifdef DEBUG_REFCOUNT
        afsi_log("%s:%d cm_GetSCache (2) scp 0x%p ref %d", file, line, scp, scp->refCount);
@@ -741,58 +988,80 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
     // end of yj code
 #endif /* AFS_FREELANCE_CLIENT */
 
+    /* we don't have the fid, recycle something */
+    newScp = cm_GetNewSCache(FALSE);    /* returns scp->rw held */
+    if (newScp == NULL) {
+       osi_Log0(afsd_logp,"cm_GetNewSCache unable to obtain *new* scache entry");
+       return CM_ERROR_WOULDBLOCK;
+    }
+#ifdef DEBUG_REFCOUNT
+    afsi_log("%s:%d cm_GetNewSCache returns scp 0x%p flags 0x%x", file, line, newScp, newScp->flags);
+#endif
+    osi_Log2(afsd_logp,"cm_GetNewSCache returns scp 0x%p flags 0x%x", newScp, newScp->flags);
+
     /* otherwise, we need to find the volume */
     if (!cm_freelanceEnabled || !isRoot) {
-        lock_ReleaseWrite(&cm_scacheLock);     /* for perf. reasons */
         cellp = cm_FindCellByID(fidp->cell, 0);
-        if (!cellp)
+        if (!cellp) {
+            /* put back newScp so it can be reused */
+            lock_ObtainWrite(&cm_scacheLock);
+           _InterlockedOr(&newScp->flags, CM_SCACHEFLAG_DELETED);
+            cm_AdjustScacheLRU(newScp);
+            lock_ReleaseWrite(&newScp->rw);
+            lock_ReleaseWrite(&cm_scacheLock);
             return CM_ERROR_NOSUCHCELL;
+        }
 
         code = cm_FindVolumeByID(cellp, fidp->volume, userp, reqp, CM_GETVOL_FLAG_CREATE, &volp);
-        if (code)
+        if (code) {
+            /* put back newScp so it can be reused */
+            lock_ObtainWrite(&cm_scacheLock);
+           _InterlockedOr(&newScp->flags, CM_SCACHEFLAG_DELETED);
+            cm_AdjustScacheLRU(newScp);
+            lock_ReleaseWrite(&newScp->rw);
+            lock_ReleaseWrite(&cm_scacheLock);
             return code;
-        lock_ObtainWrite(&cm_scacheLock);
+        }
     }
 
-    /* otherwise, we have the volume, now reverify that the scp doesn't
-     * exist, and proceed.
+    /*
+     * otherwise, we have the volume, now reverify that the scp doesn't
+     * exist, and proceed.  make sure that we hold the cm_scacheLock
+     * write-locked until the scp is put into the hash table in order
+     * to avoid a race.
      */
+    lock_ObtainWrite(&cm_scacheLock);
     for (scp=cm_data.scacheHashTablep[hash]; scp; scp=scp->nextp) {
         if (cm_FidCmp(fidp, &scp->fid) == 0) {
 #ifdef DEBUG_REFCOUNT
            afsi_log("%s:%d cm_GetSCache (3) scp 0x%p ref %d", file, line, scp, scp->refCount);
            osi_Log1(afsd_logp,"cm_GetSCache (3) scp 0x%p", scp);
 #endif
+            if (parentFidp && scp->parentVnode == 0) {
+                scp->parentVnode = parentFidp->vnode;
+                scp->parentUnique = parentFidp->unique;
+            }
+            if (volp)
+                cm_PutVolume(volp);
             cm_HoldSCacheNoLock(scp);
             cm_AdjustScacheLRU(scp);
+
+            /* put back newScp so it can be reused */
+           _InterlockedOr(&newScp->flags, CM_SCACHEFLAG_DELETED);
+            cm_AdjustScacheLRU(newScp);
+            lock_ReleaseWrite(&newScp->rw);
             lock_ReleaseWrite(&cm_scacheLock);
-            if (volp)
-                cm_PutVolume(volp);
+
             *outScpp = scp;
             return 0;
         }
     }
 
-    /* now, if we don't have the fid, recycle something */
-    scp = cm_GetNewSCache();    /* returns scp->rw held */
-    if (scp == NULL) {
-       osi_Log0(afsd_logp,"cm_GetNewSCache unable to obtain *new* scache entry");
-       lock_ReleaseWrite(&cm_scacheLock);
-       if (volp)
-           cm_PutVolume(volp);
-       return CM_ERROR_WOULDBLOCK;
-    }
-#ifdef DEBUG_REFCOUNT
-    afsi_log("%s:%d cm_GetNewSCache returns scp 0x%p flags 0x%x", file, line, scp, scp->flags);
-#endif
-    osi_Log2(afsd_logp,"cm_GetNewSCache returns scp 0x%p flags 0x%x", scp, scp->flags);
-
-    osi_assertx(!(scp->flags & CM_SCACHEFLAG_INHASH), "CM_SCACHEFLAG_INHASH set");
-
+    scp = newScp;
     scp->fid = *fidp;
     if (!cm_freelanceEnabled || !isRoot) {
         /* if this scache entry represents a volume root then we need
-         * to copy the dotdotFipd from the volume structure where the
+         * to copy the dotdotFid from the volume structure where the
          * "master" copy is stored (defect 11489)
          */
         if (volp->vol[ROVOL].ID == fidp->volume) {
@@ -808,17 +1077,23 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
                 scp->dotdotFid = cm_VolumeStateByType(volp, RWVOL)->dotdotFid;
         }
     }
+    if (parentFidp) {
+        scp->parentVnode = parentFidp->vnode;
+        scp->parentUnique = parentFidp->unique;
+    }
     if (volp)
         cm_PutVolume(volp);
+
     scp->nextp = cm_data.scacheHashTablep[hash];
     cm_data.scacheHashTablep[hash] = scp;
     _InterlockedOr(&scp->flags, CM_SCACHEFLAG_INHASH);
+    refCount = InterlockedIncrement(&scp->refCount);
+    lock_ReleaseWrite(&cm_scacheLock);
     lock_ReleaseWrite(&scp->rw);
-    scp->refCount = 1;
 #ifdef DEBUG_REFCOUNT
-    afsi_log("%s:%d cm_GetSCache sets refCount to 1 scp 0x%p", file, line, scp);
+    afsi_log("%s:%d cm_GetSCache sets refCount to 1 scp 0x%p refCount %d", file, line, scp, refCount);
 #endif
-    osi_Log1(afsd_logp,"cm_GetSCache sets refCount to 1 scp 0x%p", scp);
+    osi_Log2(afsd_logp,"cm_GetSCache sets refCount to 1 scp 0x%p refCount %d", scp, refCount);
 
     /* XXX - The following fields in the cm_scache are
      * uninitialized:
@@ -833,7 +1108,6 @@ long cm_GetSCache(cm_fid_t *fidp, cm_scache_t **outScpp, cm_user_t *userp,
     afsi_log("%s:%d cm_GetSCache (4) scp 0x%p ref %d", file, line, scp, scp->refCount);
     osi_Log1(afsd_logp,"cm_GetSCache (4) scp 0x%p", scp);
 #endif
-    lock_ReleaseWrite(&cm_scacheLock);
     return 0;
 }
 
@@ -846,6 +1120,9 @@ cm_scache_t * cm_FindSCacheParent(cm_scache_t * scp)
     cm_fid_t    parent_fid;
     cm_scache_t * pscp = NULL;
 
+    if (scp->parentVnode == 0)
+        return NULL;
+
     lock_ObtainWrite(&cm_scacheLock);
     cm_SetFid(&parent_fid, scp->fid.cell, scp->fid.volume, scp->parentVnode, scp->parentUnique);
 
@@ -981,6 +1258,9 @@ int cm_SyncOpCheckContinue(cm_scache_t * scp, afs_int32 flags, cm_buf_t * bufp)
  * possibly resulting in a bogus truncation.  The simplest way to avoid this
  * is to serialize all StoreData RPC's.  This is the reason we defined
  * CM_SCACHESYNC_STOREDATA_EXCL and CM_SCACHEFLAG_DATASTORING.
+ *
+ * CM_SCACHESYNC_BULKREAD is used to permit synchronization of multiple bulk
+ * readers which may be requesting overlapping ranges.
  */
 long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *reqp,
                afs_uint32 rights, afs_uint32 flags)
@@ -994,6 +1274,8 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
     afs_uint32 sleep_buf_cmflags = 0;
     afs_uint32 sleep_scp_bufs = 0;
     int wakeupCycle;
+    afs_int32 waitCount;
+    afs_int32 waitRequests;
 
     lock_AssertWrite(&scp->rw);
 
@@ -1167,6 +1449,12 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
             if ((flags & CM_SCACHESYNC_FORCECB) || !cm_HaveCallback(scp)) {
                 osi_Log1(afsd_logp, "CM SyncOp getting callback on scp 0x%p",
                           scp);
+
+               if (cm_EAccesFindEntry(userp, &scp->fid)) {
+                   code = CM_ERROR_NOACCESS;
+                   goto on_error;
+               }
+
                 if (bufLocked)
                    lock_ReleaseMutex(&bufp->mx);
                 code = cm_GetCallback(scp, userp, reqp, (flags & CM_SCACHESYNC_FORCECB)?1:0);
@@ -1176,7 +1464,8 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
                     lock_ObtainWrite(&scp->rw);
                 }
                 if (code)
-                    return code;
+                   goto on_error;
+
                flags &= ~CM_SCACHESYNC_FORCECB;        /* only force once */
                 continue;
             }
@@ -1186,12 +1475,16 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
             /* can't check access rights without a callback */
             osi_assertx(flags & CM_SCACHESYNC_NEEDCALLBACK, "!CM_SCACHESYNC_NEEDCALLBACK");
 
-            if ((rights & (PRSFS_WRITE|PRSFS_DELETE)) && (scp->flags & CM_SCACHEFLAG_RO))
-                return CM_ERROR_READONLY;
+           if ((rights & (PRSFS_WRITE|PRSFS_DELETE)) && (scp->flags & CM_SCACHEFLAG_RO)) {
+               code = CM_ERROR_READONLY;
+               goto on_error;
+           }
 
             if (cm_HaveAccessRights(scp, userp, reqp, rights, &outRights)) {
-                if (~outRights & rights)
-                   return CM_ERROR_NOACCESS;
+               if (~outRights & rights) {
+                   code = CM_ERROR_NOACCESS;
+                   goto on_error;
+               }
             }
             else {
                 /* we don't know the required access rights */
@@ -1203,11 +1496,19 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
                     lock_ObtainWrite(&scp->rw);
                 }
                 if (code)
-                    return code;
+                   goto on_error;
                 continue;
             }
         }
 
+        if (flags & CM_SCACHESYNC_BULKREAD) {
+            /* Don't allow concurrent fiddling with lock lists */
+            if (scp->flags & CM_SCACHEFLAG_BULKREADING) {
+                osi_Log1(afsd_logp, "CM SyncOp scp 0x%p is BULKREADING want BULKREAD", scp);
+                goto sleep;
+            }
+        }
+
         /* if we get here, we're happy */
         break;
 
@@ -1215,8 +1516,10 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
         /* first check if we're not supposed to wait: fail
          * in this case, returning with everything still locked.
          */
-        if (flags & CM_SCACHESYNC_NOWAIT)
-            return CM_ERROR_WOULDBLOCK;
+       if (flags & CM_SCACHESYNC_NOWAIT) {
+           code = CM_ERROR_WOULDBLOCK;
+           goto on_error;
+       }
 
         /* These are used for minidump debugging */
        sleep_scp_flags = scp->flags;           /* so we know why we slept */
@@ -1225,15 +1528,15 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
 
         /* wait here, then try again */
         osi_Log1(afsd_logp, "CM SyncOp sleeping scp 0x%p", scp);
-        if ( scp->flags & CM_SCACHEFLAG_WAITING ) {
-            scp->waitCount++;
-            scp->waitRequests++;
+
+        waitCount = InterlockedIncrement(&scp->waitCount);
+        waitRequests = InterlockedIncrement(&scp->waitRequests);
+        if (waitCount > 1) {
             osi_Log3(afsd_logp, "CM SyncOp CM_SCACHEFLAG_WAITING already set for 0x%p; %d threads; %d requests",
-                     scp, scp->waitCount, scp->waitRequests);
+                     scp, waitCount, waitRequests);
         } else {
             osi_Log1(afsd_logp, "CM SyncOp CM_SCACHEFLAG_WAITING set for 0x%p", scp);
             _InterlockedOr(&scp->flags, CM_SCACHEFLAG_WAITING);
-            scp->waitCount = scp->waitRequests = 1;
         }
 
         cm_SyncOpAddToWaitQueue(scp, flags, bufp);
@@ -1249,10 +1552,10 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
 
        cm_UpdateServerPriority();
 
-        scp->waitCount--;
+        waitCount = InterlockedDecrement(&scp->waitCount);
         osi_Log3(afsd_logp, "CM SyncOp woke! scp 0x%p; still waiting %d threads of %d requests",
-                 scp, scp->waitCount, scp->waitRequests);
-        if (scp->waitCount == 0) {
+                 scp, waitCount, scp->waitRequests);
+        if (waitCount == 0) {
             osi_Log1(afsd_logp, "CM SyncOp CM_SCACHEFLAG_WAITING reset for 0x%p", scp);
             _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_WAITING);
             scp->waitRequests = 0;
@@ -1276,6 +1579,8 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
         _InterlockedOr(&scp->flags, CM_SCACHEFLAG_ASYNCSTORING);
     if (flags & CM_SCACHESYNC_LOCK)
         _InterlockedOr(&scp->flags, CM_SCACHEFLAG_LOCKING);
+    if (flags & CM_SCACHESYNC_BULKREAD)
+        _InterlockedOr(&scp->flags, CM_SCACHEFLAG_BULKREADING);
 
     /* now update the buffer pointer */
     if (bufp && (flags & CM_SCACHESYNC_FETCHDATA)) {
@@ -1317,7 +1622,23 @@ long cm_SyncOp(cm_scache_t *scp, cm_buf_t *bufp, cm_user_t *userp, cm_req_t *req
         _InterlockedOr(&bufp->cmFlags, CM_BUF_CMWRITING);
     }
 
-    return 0;
+    return 0;   /* Success */
+
+  on_error:
+    /*
+     * This thread may have been a waiter that was woken up.
+     * If cm_SyncOp completes due to an error, cm_SyncOpDone() will
+     * never be called.  If there are additional threads waiting on
+     * scp those threads will never be woken.  Make sure we wake the
+     * next waiting thread before we leave.
+     */
+    if ((scp->flags & CM_SCACHEFLAG_WAITING) ||
+        !osi_QIsEmpty(&scp->waitQueueH)) {
+       osi_Log3(afsd_logp, "CM SyncOp 0x%x Waking scp 0x%p bufp 0x%p",
+                flags, scp, bufp);
+       osi_Wakeup((LONG_PTR) &scp->flags);
+    }
+    return code;
 }
 
 /* for those syncops that setup for RPCs.
@@ -1347,6 +1668,8 @@ void cm_SyncOpDone(cm_scache_t *scp, cm_buf_t *bufp, afs_uint32 flags)
         _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_ASYNCSTORING);
     if (flags & CM_SCACHESYNC_LOCK)
         _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_LOCKING);
+    if (flags & CM_SCACHESYNC_BULKREAD)
+        _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_BULKREADING);
 
     /* now update the buffer pointer */
     if (bufp && (flags & CM_SCACHESYNC_FETCHDATA)) {
@@ -1401,12 +1724,33 @@ void cm_SyncOpDone(cm_scache_t *scp, cm_buf_t *bufp, afs_uint32 flags)
     }
 
     /* and wakeup anyone who is waiting */
-    if (scp->flags & CM_SCACHEFLAG_WAITING) {
+    if ((scp->flags & CM_SCACHEFLAG_WAITING) ||
+        !osi_QIsEmpty(&scp->waitQueueH)) {
         osi_Log3(afsd_logp, "CM SyncOpDone 0x%x Waking scp 0x%p bufp 0x%p", flags, scp, bufp);
         osi_Wakeup((LONG_PTR) &scp->flags);
     }
 }
 
+static afs_uint32
+dv_diff(afs_uint64 dv1, afs_uint64 dv2)
+{
+    if ( dv1 - dv2 > 0x7FFFFFFF )
+        return (afs_uint32)(dv2 - dv1);
+    else
+        return (afs_uint32)(dv1 - dv2);
+}
+
+long
+cm_IsStatusValid(AFSFetchStatus *statusp)
+{
+    if (statusp->InterfaceVersion != 0x1 ||
+        !(statusp->FileType > 0 && statusp->FileType <= SymbolicLink)) {
+        return 0;
+    }
+
+    return 1;
+}
+
 /* merge in a response from an RPC.  The scp must be locked, and the callback
  * is optional.
  *
@@ -1420,7 +1764,7 @@ void cm_SyncOpDone(cm_scache_t *scp, cm_buf_t *bufp, afs_uint32 flags)
  * handled after the callback breaking is done, but only one of whose calls
  * started before that, can cause old info to be merged from the first call.
  */
-void cm_MergeStatus(cm_scache_t *dscp,
+long cm_MergeStatus(cm_scache_t *dscp,
                    cm_scache_t *scp, AFSFetchStatus *statusp,
                    AFSVolSync *volsyncp,
                     cm_user_t *userp, cm_req_t *reqp, afs_uint32 flags)
@@ -1428,9 +1772,13 @@ void cm_MergeStatus(cm_scache_t *dscp,
     afs_uint64 dataVersion;
     struct cm_volume *volp = NULL;
     struct cm_cell *cellp = NULL;
+    int rdr_invalidate = 0;
+    afs_uint32 activeRPCs;
 
     lock_AssertWrite(&scp->rw);
 
+    activeRPCs = 1 + InterlockedDecrement(&scp->activeRPCs);
+
     // yj: i want to create some fake status for the /afs directory and the
     // entries under that directory
 #ifdef AFS_FREELANCE_CLIENT
@@ -1467,11 +1815,26 @@ void cm_MergeStatus(cm_scache_t *dscp,
     }
 #endif /* AFS_FREELANCE_CLIENT */
 
+    if (!cm_IsStatusValid(statusp)) {
+        osi_Log3(afsd_logp, "Merge: Bad Status scp 0x%p Invalid InterfaceVersion %d FileType %d",
+                 scp, statusp->InterfaceVersion, statusp->FileType);
+        return CM_ERROR_INVAL;
+    }
+
     if (statusp->errorCode != 0) {
-        _InterlockedOr(&scp->flags, CM_SCACHEFLAG_EACCESS);
-       osi_Log2(afsd_logp, "Merge, Failure scp 0x%p code 0x%x", scp, statusp->errorCode);
+        switch (statusp->errorCode) {
+        case EACCES:
+        case UAEACCES:
+        case EPERM:
+        case UAEPERM:
+            cm_EAccesAddEntry(userp, &scp->fid, &dscp->fid);
+        }
+        osi_Log2(afsd_logp, "Merge, Failure scp 0x%p code 0x%x", scp, statusp->errorCode);
 
-       scp->fileType = 0;      /* unknown */
+        if (scp->fid.vnode & 0x1)
+            scp->fileType = CM_SCACHETYPE_DIRECTORY;
+        else
+            scp->fileType = CM_SCACHETYPE_UNKNOWN;
 
        scp->serverModTime = 0;
        scp->clientModTime = 0;
@@ -1488,16 +1851,16 @@ void cm_MergeStatus(cm_scache_t *dscp,
         scp->bufDataVersionLow = CM_SCACHE_VERSION_BAD;
         scp->fsLockCount = 0;
 
-       if (dscp) {
+       if (dscp && dscp != scp) {
             scp->parentVnode = dscp->fid.vnode;
             scp->parentUnique = dscp->fid.unique;
        } else {
             scp->parentVnode = 0;
             scp->parentUnique = 0;
        }
-       goto done;
-    } else {
-       _InterlockedAnd(&scp->flags, ~CM_SCACHEFLAG_EACCESS);
+
+        if (RDR_Initialized)
+            rdr_invalidate = 1;
     }
 
     dataVersion = statusp->dataVersionHigh;
@@ -1516,6 +1879,7 @@ void cm_MergeStatus(cm_scache_t *dscp,
                       scp->cbServerp->addr.sin_addr.s_addr,
                       volp ? volp->namep : "(unknown)");
         }
+
         osi_Log3(afsd_logp, "Bad merge, scp 0x%p, scp dv %d, RPC dv %d",
                   scp, scp->dataVersion, dataVersion);
         /* we have a number of data fetch/store operations running
@@ -1549,8 +1913,19 @@ void cm_MergeStatus(cm_scache_t *dscp,
             goto done;
     }
 
-    if (cm_readonlyVolumeVersioning)
+    /*
+     * The first field of the volsync parameter is supposed to be the
+     * volume creation date.  Unfortunately, pre-OpenAFS 1.4.11 and 1.6.0
+     * file servers do not populate the VolSync structure for BulkStat and
+     * InlineBulkStat RPCs.  As a result, the volume creation date is not
+     * trustworthy when status is obtained via [Inline]BulkStatus RPCs.
+     * If cm_readonlyVolumeVersioning is set, it is assumed that all file
+     * servers populate the VolSync structure at all times.
+     */
+    if (cm_readonlyVolumeVersioning || !(flags & CM_MERGEFLAG_BULKSTAT))
         scp->volumeCreationDate = volsyncp->spare1;       /* volume creation date */
+    else
+        scp->volumeCreationDate = 0;
 
     scp->serverModTime = statusp->ServerModTime;
 
@@ -1600,15 +1975,16 @@ void cm_MergeStatus(cm_scache_t *dscp,
         cm_AddACLCache(scp, userp, statusp->CallerAccess);
     }
 
-    if (scp->dataVersion != 0 &&
-        (!(flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) && dataVersion != scp->dataVersion ||
-         (flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) && dataVersion - scp->dataVersion > 1)) {
+    if (dataVersion != 0 && scp->dataVersion != CM_SCACHE_VERSION_BAD &&
+        (!(flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) && (dataVersion != scp->dataVersion) ||
+         (flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) &&
+         (dv_diff(dataVersion, scp->dataVersion) > activeRPCs))) {
         /*
          * We now know that all of the data buffers that we have associated
          * with this scp are invalid.  Subsequent operations will go faster
          * if the buffers are removed from the hash tables.
          *
-         * We do not remove directory buffers if the dataVersion delta is 1 because
+         * We do not remove directory buffers if the dataVersion delta is 'activeRPCs' because
          * those version numbers will be updated as part of the directory operation.
          *
          * We do not remove storedata buffers because they will still be valid.
@@ -1632,9 +2008,11 @@ void cm_MergeStatus(cm_scache_t *dscp,
              * so leave it in place.
              */
             if (cm_FidCmp(&scp->fid, &bp->fid) == 0 &&
-                 lock_TryMutex(&bp->mx)) {
+                bp->refCount == 0 &&
+                lock_TryMutex(&bp->mx)) {
                 if (bp->refCount == 0 &&
-                    !(bp->flags & CM_BUF_READING | CM_BUF_WRITING | CM_BUF_DIRTY)) {
+                    !(bp->flags & (CM_BUF_READING | CM_BUF_WRITING | CM_BUF_DIRTY)) &&
+                    !(bp->qFlags & CM_BUF_QREDIR)) {
                     prevBp = bp->fileHashBackp;
                     bp->fileHashBackp = bp->fileHashp = NULL;
                     if (prevBp)
@@ -1665,26 +2043,64 @@ void cm_MergeStatus(cm_scache_t *dscp,
         lock_ReleaseWrite(&buf_globalLock);
     }
 
-    /*
-     * If the dataVersion has changed, the mountPointStringp must be cleared
-     * in order to force a re-evaluation by cm_HandleLink().  The Windows CM
-     * does not update a mountpoint or symlink by altering the contents of
-     * the file data; but the Unix CM does.
-     */
-    if (scp->dataVersion != dataVersion && !(flags & CM_MERGEFLAG_FETCHDATA))
-        scp->mountPointStringp[0] = '\0';
+    if (scp->dataVersion != dataVersion && !(flags & CM_MERGEFLAG_FETCHDATA)) {
+        osi_Log5(afsd_logp, "cm_MergeStatus data version change scp 0x%p cell %u vol %u vn %u uniq %u",
+                 scp, scp->fid.cell, scp->fid.volume, scp->fid.vnode, scp->fid.unique);
+
+        osi_Log4(afsd_logp, ".... oldDV 0x%x:%x -> newDV 0x%x:%x",
+                 (afs_uint32)((scp->dataVersion >> 32) & 0xFFFFFFFF),
+                 (afs_uint32)(scp->dataVersion & 0xFFFFFFFF),
+                 (afs_uint32)((dataVersion >> 32) & 0xFFFFFFFF),
+                 (afs_uint32)(dataVersion & 0xFFFFFFFF));
+    }
 
     /* We maintain a range of buffer dataVersion values which are considered
      * valid.  This avoids the need to update the dataVersion on each buffer
      * object during an uncontested storeData operation.  As a result this
      * merge status no longer has performance characteristics derived from
      * the size of the file.
+     *
+     * For directory buffers, only current dataVersion values are up to date.
      */
-    if (((flags & CM_MERGEFLAG_STOREDATA) && dataVersion - scp->dataVersion > 1) ||
-         (!(flags & CM_MERGEFLAG_STOREDATA) && scp->dataVersion != dataVersion) ||
-         scp->bufDataVersionLow == 0)
+    if (((flags & (CM_MERGEFLAG_STOREDATA|CM_MERGEFLAG_DIROP)) && (dv_diff(dataVersion, scp->dataVersion) > activeRPCs)) ||
+         (!(flags & (CM_MERGEFLAG_STOREDATA|CM_MERGEFLAG_DIROP)) && (scp->dataVersion != dataVersion)) ||
+         scp->bufDataVersionLow == CM_SCACHE_VERSION_BAD ||
+         scp->fileType == CM_SCACHETYPE_DIRECTORY ||
+         flags & CM_MERGEFLAG_CACHE_BYPASS) {
         scp->bufDataVersionLow = dataVersion;
+    }
+
+    if (RDR_Initialized) {
+        /*
+         * The redirector maintains its own cached status information which
+         * must be updated when a DV change occurs that is not the result
+         * of a redirector initiated data change.
+         *
+         * If the current old DV is BAD, send a DV change notification.
+         *
+         * If the DV has changed and request was not initiated by the
+         * redirector, send a DV change notification.
+         *
+         * If the request was initiated by the redirector, send a notification
+         * for store and directory operations that result in a DV change greater
+         * than the number of active RPCs or any other operation that results
+         * in an unexpected DV change such as FetchStatus.
+         */
 
+        if (scp->dataVersion == CM_SCACHE_VERSION_BAD && dataVersion != 0) {
+            rdr_invalidate = 1;
+        } else if (!(reqp->flags & CM_REQ_SOURCE_REDIR) && scp->dataVersion != dataVersion) {
+            rdr_invalidate = 1;
+        } else if (reqp->flags & CM_REQ_SOURCE_REDIR) {
+            if (!(flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) &&
+                (dv_diff(dataVersion, scp->dataVersion) > activeRPCs - 1)) {
+                rdr_invalidate = 1;
+            } else if ((flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) &&
+                       dv_diff(dataVersion, scp->dataVersion) > activeRPCs) {
+                rdr_invalidate = 1;
+            }
+        }
+    }
     scp->dataVersion = dataVersion;
 
     /*
@@ -1712,10 +2128,33 @@ void cm_MergeStatus(cm_scache_t *dscp,
             lock_ReleaseWrite(&volp->rw);
         }
     }
+
+    /* Remove cached EACCES / EPERM errors if the file is a directory */
+    if (scp->fileType == CM_SCACHETYPE_DIRECTORY &&
+        !(volp && (volp->flags & CM_VOLUMEFLAG_DFS_VOLUME)) &&
+        !cm_accessPerFileCheck)
+    {
+        cm_EAccesClearParentEntries(&scp->fid);
+    }
+
   done:
     if (volp)
         cm_PutVolume(volp);
 
+    /*
+     * The scache rw lock cannot be held across the invalidation.
+     * Doing so can result in deadlocks with other threads processing
+     * requests initiated by the afs redirector.
+     */
+    if (rdr_invalidate) {
+        lock_ReleaseWrite(&scp->rw);
+        RDR_InvalidateObject(scp->fid.cell, scp->fid.volume, scp->fid.vnode,
+                             scp->fid.unique, scp->fid.hash,
+                             scp->fileType, AFS_INVALIDATE_DATA_VERSION);
+        lock_ObtainWrite(&scp->rw);
+    }
+
+    return 0;
 }
 
 /* note that our stat cache info is incorrect, so force us eventually
@@ -1734,17 +2173,14 @@ void cm_DiscardSCache(cm_scache_t *scp)
        scp->cbServerp = NULL;
     }
     scp->cbExpires = 0;
-    scp->volumeCreationDate = 0;
-    _InterlockedAnd(&scp->flags, ~(CM_SCACHEFLAG_CALLBACK | CM_SCACHEFLAG_LOCAL));
+    scp->cbIssued = 0;
+    _InterlockedAnd(&scp->flags, ~(CM_SCACHEFLAG_LOCAL | CM_SCACHEFLAG_RDR_IN_USE));
     cm_dnlcPurgedp(scp);
     cm_dnlcPurgevp(scp);
     cm_FreeAllACLEnts(scp);
 
     if (scp->fileType == CM_SCACHETYPE_DFSLINK)
         cm_VolStatus_Invalidate_DFS_Mapping(scp);
-
-    /* Force mount points and symlinks to be re-evaluated */
-    scp->mountPointStringp[0] = '\0';
 }
 
 void cm_AFSFidFromFid(AFSFid *afsFidp, cm_fid_t *fidp)
@@ -1810,34 +2246,6 @@ void cm_ReleaseSCacheNoLock(cm_scache_t *scp)
     osi_Log2(afsd_logp,"cm_ReleaseSCacheNoLock scp 0x%p ref %d",scp, refCount);
     afsi_log("%s:%d cm_ReleaseSCacheNoLock scp 0x%p ref %d", file, line, scp, refCount);
 #endif
-
-    if (refCount == 0 && (scp->flags & CM_SCACHEFLAG_DELETED)) {
-        int deleted = 0;
-        long      lockstate;
-
-        lockstate = lock_GetRWLockState(&cm_scacheLock);
-        if (lockstate != OSI_RWLOCK_WRITEHELD)
-            lock_ReleaseRead(&cm_scacheLock);
-        else
-            lock_ReleaseWrite(&cm_scacheLock);
-
-        lock_ObtainWrite(&scp->rw);
-        if (scp->flags & CM_SCACHEFLAG_DELETED)
-            deleted = 1;
-
-        if (refCount == 0 && deleted) {
-            lock_ObtainWrite(&cm_scacheLock);
-            cm_RecycleSCache(scp, 0);
-            if (lockstate != OSI_RWLOCK_WRITEHELD)
-                lock_ConvertWToR(&cm_scacheLock);
-        } else {
-            if (lockstate != OSI_RWLOCK_WRITEHELD)
-                lock_ObtainRead(&cm_scacheLock);
-            else
-                lock_ObtainWrite(&cm_scacheLock);
-        }
-        lock_ReleaseWrite(&scp->rw);
-    }
 }
 
 #ifdef DEBUG_REFCOUNT
@@ -1861,19 +2269,6 @@ void cm_ReleaseSCache(cm_scache_t *scp)
     afsi_log("%s:%d cm_ReleaseSCache scp 0x%p ref %d", file, line, scp, refCount);
 #endif
     lock_ReleaseRead(&cm_scacheLock);
-
-    if (scp->flags & CM_SCACHEFLAG_DELETED) {
-        int deleted = 0;
-        lock_ObtainWrite(&scp->rw);
-        if (scp->flags & CM_SCACHEFLAG_DELETED)
-            deleted = 1;
-        if (deleted) {
-            lock_ObtainWrite(&cm_scacheLock);
-            cm_RecycleSCache(scp, 0);
-            lock_ReleaseWrite(&cm_scacheLock);
-        }
-        lock_ReleaseWrite(&scp->rw);
-    }
 }
 
 /* just look for the scp entry to get filetype */
@@ -1951,10 +2346,10 @@ int cm_DumpSCache(FILE *outputFile, char *cookie, int lock)
         }
         sprintf(output,
                 "%s scp=0x%p, fid (cell=%d, volume=%d, vnode=%d, unique=%d) type=%d dv=%I64d len=0x%I64x "
-                "mp='%s' Locks (server=0x%x shared=%d excl=%d clnt=%d) fsLockCount=%d linkCount=%d anyAccess=0x%x "
+                "mpDV=%I64d mp='%s' Locks (server=0x%x shared=%d excl=%d clnt=%d) fsLockCount=%d linkCount=%d anyAccess=0x%x "
                 "flags=0x%x cbServer='%s' cbExpires='%s' volumeCreationDate='%s' refCount=%u\r\n",
                 cookie, scp, scp->fid.cell, scp->fid.volume, scp->fid.vnode, scp->fid.unique,
-                scp->fileType, scp->dataVersion, scp->length.QuadPart, scp->mountPointStringp,
+                scp->fileType, scp->dataVersion, scp->length.QuadPart, scp->mpDataVersion, scp->mountPointStringp,
                 scp->serverLock, scp->sharedLocks, scp->exclusiveLocks, scp->clientLocks, scp->fsLockCount,
                 scp->linkCount, scp->anyAccess, scp->flags, srvStr ? srvStr : "<none>", cbt ? cbt : "<none>",
                 cdrot ? cdrot : "<none>", scp->refCount);
@@ -1965,7 +2360,7 @@ int cm_DumpSCache(FILE *outputFile, char *cookie, int lock)
             WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);
 
             for (q = scp->fileLocksH; q; q = osi_QNext(q)) {
-                cm_file_lock_t * lockp = (cm_file_lock_t *)((char *) q - offsetof(cm_file_lock_t, fileq));
+                cm_file_lock_t * lockp = fileq_to_cm_file_lock_t(q);
                 sprintf(output, "  %s lockp=0x%p scp=0x%p, cm_userp=0x%p offset=0x%I64x len=0x%08I64x type=0x%x "
                         "key=0x%I64x flags=0x%x update=0x%I64u\r\n",
                         cookie, lockp, lockp->scp, lockp->userp, lockp->range.offset, lockp->range.length,