Windows: Track active RPCs per scache_t
authorJeffrey Altman <jaltman@your-file-system.com>
Sat, 12 Nov 2011 18:45:08 +0000 (13:45 -0500)
committerJeffrey Altman <jaltman@secure-endpoints.com>
Sun, 13 Nov 2011 00:16:32 +0000 (16:16 -0800)
It has been noticed that multiple RPCs can be active on
a cm_scache_t object at the same time.  This is especially
true of directory objects with the redirector.  Track the
number of active RPCs and use that number in cm_MergeStatus
when deciding whether or not to discard the cached data for
the object.

Change-Id: If291bb4c0e48d0ec087c3a7c2640e4598e7fd419
Reviewed-on: http://gerrit.openafs.org/6001
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Jeffrey Altman <jaltman@secure-endpoints.com>
Tested-by: Jeffrey Altman <jaltman@secure-endpoints.com>

src/WINNT/afsd/cm_callback.c
src/WINNT/afsd/cm_dcache.c
src/WINNT/afsd/cm_scache.c
src/WINNT/afsd/cm_scache.h
src/WINNT/afsd/cm_vnodeops.c

index b85b4f8..aef6777 100644 (file)
@@ -1790,6 +1790,7 @@ long cm_GetCallback(cm_scache_t *scp, struct cm_user *userp,
         if (scp->dataVersion != cm_data.fakeDirVersion) {
             memset(&afsStatus, 0, sizeof(afsStatus));
             memset(&volSync, 0, sizeof(volSync));
+            InterlockedIncrement(&scp->activeRPCs);
 
             // Fetch the status info
             cm_MergeStatus(NULL, scp, &afsStatus, &volSync, userp, reqp, 0);
@@ -1820,6 +1821,7 @@ long cm_GetCallback(cm_scache_t *scp, struct cm_user *userp,
         lock_ReleaseWrite(&scp->rw);
 
         /* now make the RPC */
+        InterlockedIncrement(&scp->activeRPCs);
         osi_Log4(afsd_logp, "CALL FetchStatus scp 0x%p vol %u vn %u uniq %u",
                  scp, sfid.volume, sfid.vnode, sfid.unique);
         do {
@@ -1848,6 +1850,7 @@ long cm_GetCallback(cm_scache_t *scp, struct cm_user *userp,
             cm_MergeStatus(NULL, scp, &afsStatus, &volSync, userp, reqp, 0);
         } else {
             cm_EndCallbackGrantingCall(NULL, &cbr, NULL, NULL, 0);
+            InterlockedDecrement(&scp->activeRPCs);
         }
 
         /* if we got an error, return to caller */
index 2552f57..1598714 100644 (file)
@@ -152,6 +152,7 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
         require_64bit_ops = 1;
     }
 
+    InterlockedIncrement(&scp->activeRPCs);
     lock_ReleaseWrite(&scp->rw);
 
     /* now we're ready to do the store operation */
@@ -417,6 +418,7 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
 
         cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp, CM_MERGEFLAG_STOREDATA);
     } else {
+        InterlockedDecrement(&scp->activeRPCs);
         if (code == CM_ERROR_SPACE)
             _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OUTOFSPACE);
         else if (code == CM_ERROR_QUOTA)
@@ -478,6 +480,7 @@ long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
         require_64bit_ops = 1;
     }
 
+    InterlockedIncrement(&scp->activeRPCs);
     lock_ReleaseWrite(&scp->rw);
 
     cm_AFSFidFromFid(&tfid, &scp->fid);
@@ -570,6 +573,8 @@ long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
         if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
             _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_LENGTH);
         cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp, CM_MERGEFLAG_STOREDATA);
+    } else {
+        InterlockedDecrement(&scp->activeRPCs);
     }
     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
 
@@ -1702,6 +1707,7 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
         return 0;
     }
 
+    InterlockedIncrement(&scp->activeRPCs);
     lock_ReleaseWrite(&scp->rw);
     scp_locked = 0;
 
@@ -2145,6 +2151,8 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
 
     if (code == 0)
         cm_MergeStatus(NULL, scp, &afsStatus, &volSync, userp, reqp, CM_MERGEFLAG_FETCHDATA);
+    else
+        InterlockedDecrement(&scp->activeRPCs);
 
     return code;
 }
@@ -2206,6 +2214,7 @@ long cm_GetData(cm_scache_t *scp, osi_hyper_t *offsetp, char *datap, int data_le
         require_64bit_ops = 1;
     }
 
+    InterlockedIncrement(&scp->activeRPCs);
     osi_Log2(afsd_logp, "cm_GetData: fetching data scp %p DV 0x%x", scp, scp->dataVersion);
 
 #ifdef AFS_FREELANCE_CLIENT
@@ -2494,6 +2503,8 @@ long cm_GetData(cm_scache_t *scp, osi_hyper_t *offsetp, char *datap, int data_le
 
     if (code == 0)
         cm_MergeStatus(NULL, scp, &afsStatus, &volSync, userp, reqp, CM_MERGEFLAG_FETCHDATA);
+    else
+        InterlockedDecrement(&scp->activeRPCs);
 
     return code;
 }
index e107571..a2290b3 100644 (file)
@@ -596,6 +596,7 @@ void cm_InitSCache(int newFile, long maxSCaches)
                 scp->openShares = 0;
                 scp->openExcls = 0;
                 scp->waitCount = 0;
+                scp->activeRPCs = 0;
 #ifdef USE_BPLUS
                 scp->dirBplus = NULL;
                 scp->dirDataVersion = CM_SCACHE_VERSION_BAD;
@@ -1481,9 +1482,13 @@ void cm_MergeStatus(cm_scache_t *dscp,
     afs_uint64 dataVersion;
     struct cm_volume *volp = NULL;
     struct cm_cell *cellp = NULL;
+    int rdr_invalidate = 0;
+    afs_uint32 activeRPCs;
 
     lock_AssertWrite(&scp->rw);
 
+    activeRPCs = 1 + InterlockedDecrement(&scp->activeRPCs);
+
     // yj: i want to create some fake status for the /afs directory and the
     // entries under that directory
 #ifdef AFS_FREELANCE_CLIENT
@@ -1665,13 +1670,13 @@ void cm_MergeStatus(cm_scache_t *dscp,
 
     if (scp->dataVersion != 0 &&
         (!(flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) && dataVersion != scp->dataVersion ||
-         (flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) && dataVersion - scp->dataVersion > 1)) {
+         (flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) && dataVersion - scp->dataVersion > activeRPCs)) {
         /*
          * We now know that all of the data buffers that we have associated
          * with this scp are invalid.  Subsequent operations will go faster
          * if the buffers are removed from the hash tables.
          *
-         * We do not remove directory buffers if the dataVersion delta is 1 because
+         * We do not remove directory buffers if the dataVersion delta is 'activeRPCs' because
          * those version numbers will be updated as part of the directory operation.
          *
          * We do not remove storedata buffers because they will still be valid.
@@ -1754,22 +1759,18 @@ void cm_MergeStatus(cm_scache_t *dscp,
      * merge status no longer has performance characteristics derived from
      * the size of the file.
      */
-    if (((flags & CM_MERGEFLAG_STOREDATA) && dataVersion - scp->dataVersion > 1) ||
+    if (((flags & CM_MERGEFLAG_STOREDATA) && dataVersion - scp->dataVersion > activeRPCs) ||
          (!(flags & CM_MERGEFLAG_STOREDATA) && scp->dataVersion != dataVersion) ||
          scp->bufDataVersionLow == 0)
         scp->bufDataVersionLow = dataVersion;
 
     if (RDR_Initialized && scp->dataVersion != CM_SCACHE_VERSION_BAD) {
         if ( ( !(reqp->flags & CM_REQ_SOURCE_REDIR) || !(flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA))) &&
-             scp->dataVersion != dataVersion ) {
-            RDR_InvalidateObject(scp->fid.cell, scp->fid.volume, scp->fid.vnode,
-                                 scp->fid.unique, scp->fid.hash,
-                                 scp->fileType, AFS_INVALIDATE_DATA_VERSION);
+             scp->dataVersion != dataVersion && (dataVersion - scp->dataVersion > activeRPCs - 1)) {
+            rdr_invalidate = 1;
         } else if ( (reqp->flags & CM_REQ_SOURCE_REDIR) && (flags & (CM_MERGEFLAG_DIROP|CM_MERGEFLAG_STOREDATA)) &&
-                    dataVersion - scp->dataVersion > 1) {
-            RDR_InvalidateObject(scp->fid.cell, scp->fid.volume, scp->fid.vnode,
-                                 scp->fid.unique, scp->fid.hash,
-                                 scp->fileType, AFS_INVALIDATE_DATA_VERSION);
+                    dataVersion - scp->dataVersion > activeRPCs) {
+            rdr_invalidate = 1;
         }
     }
     scp->dataVersion = dataVersion;
@@ -1799,10 +1800,23 @@ void cm_MergeStatus(cm_scache_t *dscp,
             lock_ReleaseWrite(&volp->rw);
         }
     }
+
   done:
     if (volp)
         cm_PutVolume(volp);
 
+    /*
+     * The scache rw lock cannot be held across the invalidation.
+     * Doing so can result in deadlocks with other threads processing
+     * requests initiated by the afs redirector.
+     */
+    if (rdr_invalidate) {
+        lock_ReleaseWrite(&scp->rw);
+        RDR_InvalidateObject(scp->fid.cell, scp->fid.volume, scp->fid.vnode,
+                             scp->fid.unique, scp->fid.hash,
+                             scp->fileType, AFS_INVALIDATE_DATA_VERSION);
+        lock_ObtainWrite(&scp->rw);
+    }
 }
 
 /* note that our stat cache info is incorrect, so force us eventually
index 3829d6e..5f8e06f 100644 (file)
@@ -235,6 +235,8 @@ typedef struct cm_scache {
     osi_queue_t * redirQueueT;
     afs_uint32    redirBufCount;    /* Number of buffers held by the redirector */
     time_t        redirLastAccess;  /* last time redir accessed the vnode */
+
+    afs_uint32 activeRPCs;              /* atomic */
 } cm_scache_t;
 
 /* dataVersion */
index 365720e..c177fc5 100644 (file)
@@ -1661,6 +1661,8 @@ long cm_Unlink(cm_scache_t *dscp, fschar_t *fnamep, clientchar_t * cnamep,
     }
 
     /* make the RPC */
+    InterlockedIncrement(&dscp->activeRPCs);
+
     afsFid.Volume = dscp->fid.volume;
     afsFid.Vnode = dscp->fid.vnode;
     afsFid.Unique = dscp->fid.unique;
@@ -1697,12 +1699,15 @@ long cm_Unlink(cm_scache_t *dscp, fschar_t *fnamep, clientchar_t * cnamep,
             RDR_InvalidateObject(dscp->fid.cell, dscp->fid.volume, dscp->fid.vnode,
                                  dscp->fid.unique, dscp->fid.hash,
                                  dscp->fileType, AFS_INVALIDATE_DATA_VERSION);
-    } else if (code == CM_ERROR_NOSUCHFILE) {
-       /* windows would not have allowed the request to delete the file
-        * if it did not believe the file existed.  therefore, we must
-        * have an inconsistent view of the world.
-        */
-       dscp->cbServerp = NULL;
+    } else {
+        InterlockedDecrement(&scp->activeRPCs);
+        if (code == CM_ERROR_NOSUCHFILE) {
+            /* windows would not have allowed the request to delete the file
+             * if it did not believe the file existed.  therefore, we must
+             * have an inconsistent view of the world.
+             */
+            dscp->cbServerp = NULL;
+        }
     }
     cm_SyncOpDone(dscp, NULL, sflags);
     lock_ReleaseWrite(&dscp->rw);
@@ -2542,6 +2547,7 @@ cm_TryBulkStatRPC(cm_scache_t *dscp, cm_bulkStat_t *bbp, cm_user_t *userp, cm_re
                                                &bbp->callbacks[j],
                                                &volSync,
                                                CM_CALLBACK_MAINTAINCOUNT);
+                    InterlockedIncrement(&scp->activeRPCs);
                     cm_MergeStatus(dscp, scp, &bbp->stats[j], &volSync, userp, reqp, 0);
                     lock_ReleaseWrite(&scp->rw);
                 } else {
@@ -2777,6 +2783,8 @@ long cm_SetAttr(cm_scache_t *scp, cm_attr_t *attrp, cm_user_t *userp,
     lock_ReleaseRead(&scp->rw);
 
     /* now make the RPC */
+    InterlockedIncrement(&scp->activeRPCs);
+
     osi_Log1(afsd_logp, "CALL StoreStatus scp 0x%p", scp);
     do {
         code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
@@ -2801,6 +2809,8 @@ long cm_SetAttr(cm_scache_t *scp, cm_attr_t *attrp, cm_user_t *userp,
     if (code == 0)
         cm_MergeStatus(NULL, scp, &afsOutStatus, &volSync, userp, reqp,
                         CM_MERGEFLAG_FORCE|CM_MERGEFLAG_STOREDATA);
+    else
+        InterlockedDecrement(&scp->activeRPCs);
     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STORESTATUS);
 
     /* if we're changing the mode bits, discard the ACL cache,
@@ -2879,6 +2889,7 @@ long cm_Create(cm_scache_t *dscp, clientchar_t *cnamep, long flags, cm_attr_t *a
     cm_StatusFromAttr(&inStatus, NULL, attrp);
 
     /* try the RPC now */
+    InterlockedIncrement(&dscp->activeRPCs);
     osi_Log1(afsd_logp, "CALL CreateFile scp 0x%p", dscp);
     do {
         code = cm_ConnFromFID(&dscp->fid, userp, reqp, &connp);
@@ -2912,6 +2923,8 @@ long cm_Create(cm_scache_t *dscp, clientchar_t *cnamep, long flags, cm_attr_t *a
     lock_ObtainWrite(&dscp->rw);
     if (code == 0)
         cm_MergeStatus(NULL, dscp, &updatedDirStatus, &volSync, userp, reqp, CM_MERGEFLAG_DIROP);
+    else
+        InterlockedDecrement(&dscp->activeRPCs);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     lock_ReleaseWrite(&dscp->rw);
 
@@ -2929,6 +2942,7 @@ long cm_Create(cm_scache_t *dscp, clientchar_t *cnamep, long flags, cm_attr_t *a
             if (!cm_HaveCallback(scp)) {
                 cm_EndCallbackGrantingCall(scp, &cbReq,
                                            &newFileCallback, &volSync, 0);
+                InterlockedIncrement(&scp->activeRPCs);
                 cm_MergeStatus(dscp, scp, &newFileStatus, &volSync,
                                userp, reqp, 0);
                 didEnd = 1;
@@ -3062,6 +3076,7 @@ long cm_MakeDir(cm_scache_t *dscp, clientchar_t *cnamep, long flags, cm_attr_t *
     cm_StatusFromAttr(&inStatus, NULL, attrp);
 
     /* try the RPC now */
+    InterlockedIncrement(&dscp->activeRPCs);
     osi_Log1(afsd_logp, "CALL MakeDir scp 0x%p", dscp);
     do {
         code = cm_ConnFromFID(&dscp->fid, userp, reqp, &connp);
@@ -3095,6 +3110,8 @@ long cm_MakeDir(cm_scache_t *dscp, clientchar_t *cnamep, long flags, cm_attr_t *
     lock_ObtainWrite(&dscp->rw);
     if (code == 0)
         cm_MergeStatus(NULL, dscp, &updatedDirStatus, &volSync, userp, reqp, CM_MERGEFLAG_DIROP);
+    else
+        InterlockedDecrement(&dscp->activeRPCs);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     lock_ReleaseWrite(&dscp->rw);
 
@@ -3111,6 +3128,7 @@ long cm_MakeDir(cm_scache_t *dscp, clientchar_t *cnamep, long flags, cm_attr_t *
             if (!cm_HaveCallback(scp)) {
                 cm_EndCallbackGrantingCall(scp, &cbReq,
                                             &newDirCallback, &volSync, 0);
+                InterlockedIncrement(&scp->activeRPCs);
                 cm_MergeStatus(dscp, scp, &newDirStatus, &volSync,
                                 userp, reqp, 0);
                 didEnd = 1;
@@ -3183,6 +3201,7 @@ long cm_Link(cm_scache_t *dscp, clientchar_t *cnamep, cm_scache_t *sscp, long fl
     fnamep = cm_ClientStringToFsStringAlloc(cnamep, -1, NULL);
 
     /* try the RPC now */
+    InterlockedIncrement(&dscp->activeRPCs);
     osi_Log1(afsd_logp, "CALL Link scp 0x%p", dscp);
     do {
         code = cm_ConnFromFID(&dscp->fid, userp, reqp, &connp);
@@ -3223,6 +3242,8 @@ long cm_Link(cm_scache_t *dscp, clientchar_t *cnamep, cm_scache_t *sscp, long fl
             RDR_InvalidateObject(dscp->fid.cell, dscp->fid.volume, dscp->fid.vnode,
                                  dscp->fid.unique, dscp->fid.hash,
                                  dscp->fileType, AFS_INVALIDATE_DATA_VERSION);
+    } else {
+        InterlockedDecrement(&dscp->activeRPCs);
     }
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     lock_ReleaseWrite(&dscp->rw);
@@ -3240,6 +3261,7 @@ long cm_Link(cm_scache_t *dscp, clientchar_t *cnamep, cm_scache_t *sscp, long fl
     /* Update the linked object status */
     if (code == 0) {
         lock_ObtainWrite(&sscp->rw);
+        InterlockedIncrement(&sscp->activeRPCs);
         cm_MergeStatus(NULL, sscp, &newLinkStatus, &volSync, userp, reqp, 0);
         lock_ReleaseWrite(&sscp->rw);
     }
@@ -3295,6 +3317,7 @@ long cm_SymLink(cm_scache_t *dscp, clientchar_t *cnamep, fschar_t *contentsp, lo
     cm_StatusFromAttr(&inStatus, NULL, attrp);
 
     /* try the RPC now */
+    InterlockedIncrement(&dscp->activeRPCs);
     osi_Log1(afsd_logp, "CALL Symlink scp 0x%p", dscp);
     do {
         code = cm_ConnFromFID(&dscp->fid, userp, reqp, &connp);
@@ -3327,6 +3350,8 @@ long cm_SymLink(cm_scache_t *dscp, clientchar_t *cnamep, fschar_t *contentsp, lo
     lock_ObtainWrite(&dscp->rw);
     if (code == 0)
         cm_MergeStatus(NULL, dscp, &updatedDirStatus, &volSync, userp, reqp, CM_MERGEFLAG_DIROP);
+    else
+        InterlockedDecrement(&dscp->activeRPCs);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     lock_ReleaseWrite(&dscp->rw);
 
@@ -3353,6 +3378,7 @@ long cm_SymLink(cm_scache_t *dscp, clientchar_t *cnamep, fschar_t *contentsp, lo
         if (code == 0) {
             lock_ObtainWrite(&scp->rw);
             if (!cm_HaveCallback(scp)) {
+                InterlockedIncrement(&scp->activeRPCs);
                 cm_MergeStatus(dscp, scp, &newLinkStatus, &volSync,
                                 userp, reqp, 0);
             }
@@ -3448,6 +3474,7 @@ long cm_RemoveDir(cm_scache_t *dscp, fschar_t *fnamep, clientchar_t *cnamep, cm_
     didEnd = 0;
 
     /* try the RPC now */
+    InterlockedIncrement(&dscp->activeRPCs);
     osi_Log1(afsd_logp, "CALL RemoveDir scp 0x%p", dscp);
     do {
         code = cm_ConnFromFID(&dscp->fid, userp, reqp, &connp);
@@ -3480,6 +3507,8 @@ long cm_RemoveDir(cm_scache_t *dscp, fschar_t *fnamep, clientchar_t *cnamep, cm_
     if (code == 0) {
         cm_dnlcRemove(dscp, cnamep);
         cm_MergeStatus(NULL, dscp, &updatedDirStatus, &volSync, userp, reqp, CM_MERGEFLAG_DIROP);
+    } else {
+        InterlockedDecrement(&dscp->activeRPCs);
     }
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     lock_ReleaseWrite(&dscp->rw);
@@ -3779,6 +3808,9 @@ long cm_Rename(cm_scache_t *oldDscp, fschar_t *oldNamep, clientchar_t *cOldNamep
     newNamep = cm_ClientStringToFsStringAlloc(cNewNamep, -1, NULL);
 
     /* try the RPC now */
+    InterlockedIncrement(&oldDscp->activeRPCs);
+    if (!oneDir)
+        InterlockedIncrement(&newDscp->activeRPCs);
     osi_Log2(afsd_logp, "CALL Rename old scp 0x%p new scp 0x%p",
               oldDscp, newDscp);
     do {
@@ -3819,6 +3851,8 @@ long cm_Rename(cm_scache_t *oldDscp, fschar_t *oldNamep, clientchar_t *cOldNamep
     if (code == 0)
         cm_MergeStatus(NULL, oldDscp, &updatedOldDirStatus, &volSync,
                        userp, reqp, CM_MERGEFLAG_DIROP);
+    else
+        InterlockedDecrement(&oldDscp->activeRPCs);
     cm_SyncOpDone(oldDscp, NULL, CM_SCACHESYNC_STOREDATA);
     lock_ReleaseWrite(&oldDscp->rw);
 
@@ -3859,6 +3893,8 @@ long cm_Rename(cm_scache_t *oldDscp, fschar_t *oldNamep, clientchar_t *cOldNamep
         if (code == 0)
             cm_MergeStatus(NULL, newDscp, &updatedNewDirStatus, &volSync,
                             userp, reqp, CM_MERGEFLAG_DIROP);
+        else
+            InterlockedIncrement(&newDscp->activeRPCs);
         cm_SyncOpDone(newDscp, NULL, CM_SCACHESYNC_STOREDATA);
         lock_ReleaseWrite(&newDscp->rw);