Windows: Improvements to background fetch processing
authorJeffrey Altman <jaltman@secure-endpoints.com>
Sat, 14 Nov 2009 21:33:31 +0000 (16:33 -0500)
committerJeffrey Altman <jaltman|account-1000011@unknown>
Thu, 19 Nov 2009 16:55:26 +0000 (08:55 -0800)
Log offset and length in cm_BkgPrefetch()

Convert mxheld to rwheld in cm_BkgPrefetch() now that cm_scache_t
objects use rwlocks.

Do not clear CM_SCACHEFLAG_PREFETCHING from within the error
returns from cm_CheckFetchRange().  Let the caller decide if
that is appropriate.

Add CM_BUF_CMBKGFETCH cm_buf_t cmFlag to make it possible to
quickly detect if a background fetch operation has already
been queued for a particular cm_buf_t data range.

LICENSE MIT

Change-Id: I4ac9a2f84ddd64cba86612d7a2abe849bd0bec0b
Reviewed-on: http://gerrit.openafs.org/827
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Jeffrey Altman <jaltman@openafs.org>
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>

src/WINNT/afsd/cm_buf.h
src/WINNT/afsd/cm_dcache.c
src/WINNT/afsd/cm_dcache.h

index 886ec36..6ca613c 100644 (file)
@@ -94,6 +94,8 @@ typedef struct cm_buf {
 #define CM_BUF_CMSTORING       2       /* storing this buffer */
 #define CM_BUF_CMFULLYFETCHED  4       /* read-while-fetching optimization */
 #define CM_BUF_CMWRITING        8       /* writing to this buffer */
+#define CM_BUF_CMBKGFETCH      16       /* background fetch queued by
+                                         * prefetch or redirector */
 /* waiting is done based on scp->flags.  Removing bits from cmFlags
    should be followed by waking the scp. */
 
@@ -101,7 +103,7 @@ typedef struct cm_buf {
 #define CM_BUF_WRITING 2       /* now writing buffer to the disk */
 #define CM_BUF_INHASH  4       /* in the hash table */
 #define CM_BUF_DIRTY   8       /* buffer is dirty */
-#define CM_BUF_INLRU   0x10    /* in lru queue */
+#define CM_BUF_INLRU   0x10    /* in lru queue (aka free list) */
 #define CM_BUF_ERROR   0x20    /* something went wrong on delayed write */
 #define CM_BUF_WAITING 0x40    /* someone's waiting for a flag to change */
 #define CM_BUF_INDL     0x80    /* in the dirty list */
index 340eb67..1f47843 100644 (file)
@@ -559,8 +559,12 @@ int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
         return 0;
 }
 
-/* used when deciding whether to do a prefetch or not */
-long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *length,
+/*
+ * used when deciding whether to do a background fetch or not.
+ * call with scp->rw write-locked.
+ */
+afs_int32
+cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *length,
                         cm_user_t *userp, cm_req_t *reqp, osi_hyper_t *realBasep)
 {
     osi_hyper_t tbase;
@@ -577,16 +581,12 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
     tlength = *length;
     tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
     stop = 0;
-    lock_ObtainWrite(&scp->rw);
     while (LargeIntegerGreaterThanZero(tlength)) {
         /* get callback so we can do a meaningful dataVersion comparison */
         code = cm_SyncOp(scp, NULL, userp, reqp, 0,
                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
-        if (code) {
-            scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
-            lock_ReleaseWrite(&scp->rw);
+        if (code)
             return code;
-        }
                 
         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
             /* we're past the end of file */
@@ -596,7 +596,7 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
         bp = buf_Find(scp, &tbase);
         /* We cheat slightly by not locking the bp mutex. */
         if (bp) {
-            if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0
+            if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING | CM_BUF_CMBKGFETCH)) == 0
                  && (bp->dataVersion < scp->bufDataVersionLow || bp->dataVersion > scp->dataVersion))
                 stop = 1;
             buf_Release(bp);
@@ -620,7 +620,6 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
      */
     if (stop == 0) {
         /* return non-zero code since realBasep won't be valid */
-        scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
         code = -1;
     }   
     else {
@@ -628,7 +627,6 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
         *realBasep = tbase;
         code = 0;
     }
-    lock_ReleaseWrite(&scp->rw);
     return code;
 }
 
@@ -709,8 +707,8 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af
     osi_hyper_t end;
     osi_hyper_t fetched;
     osi_hyper_t tblocksize;
-    long code;
-    int mxheld = 0;
+    afs_int32 code;
+    int rxheld = 0;
     cm_buf_t *bp = NULL;
     cm_req_t req;
 
@@ -729,15 +727,16 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af
 
     end = LargeIntegerAdd(base, length);
         
-    osi_Log3(afsd_logp, "Starting BKG prefetch scp 0x%p, base 0x%x:%x", scp, p2, p1);
+    osi_Log5(afsd_logp, "Starting BKG prefetch scp 0x%p offset 0x%x:%x length 0x%x:%x",
+             scp, p2, p1, p4, p3);
 
     for ( code = 0, offset = base;
           code == 0 && LargeIntegerLessThan(offset, end); 
           offset = LargeIntegerAdd(offset, tblocksize) )
     {
-        if (mxheld) {
+        if (rxheld) {
             lock_ReleaseWrite(&scp->rw);
-            mxheld = 0;
+            rxheld = 0;
         }
 
         code = buf_Get(scp, &offset, &req, &bp);
@@ -746,32 +745,56 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af
 
         if (bp->cmFlags & CM_BUF_CMFETCHING) {
             /* skip this buffer as another thread is already fetching it */
+            if (!rxheld) {
+                lock_ObtainWrite(&scp->rw);
+                rxheld = 1;
+            }
+            bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
             buf_Release(bp);
             bp = NULL;
             continue;
         }
 
-        if (!mxheld) {
+        if (!rxheld) {
             lock_ObtainWrite(&scp->rw);
-            mxheld = 1;
+            rxheld = 1;
         }
 
         code = cm_GetBuffer(scp, bp, NULL, userp, &req);
         if (code == 0)
             fetched = LargeIntegerAdd(fetched, tblocksize); 
         buf_Release(bp);
+        bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
     }
     
-    if (!mxheld) {
+    if (!rxheld) {
         lock_ObtainWrite(&scp->rw);
-        mxheld = 1;
+        rxheld = 1;
+    }
+
+    /* Clear flag from any remaining buffers */
+    for ( ;
+          LargeIntegerLessThan(offset, end);
+          offset = LargeIntegerAdd(offset, tblocksize) )
+    {
+        bp = buf_Find(scp, &offset);
+        if (bp) {
+            bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
+            buf_Release(bp);
+        }
     }
     cm_ClearPrefetchFlag(LargeIntegerGreaterThanZero(fetched) ? 0 : code, 
                          scp, &base, &fetched);
+
+    /* wakeup anyone who is waiting */
+    if (scp->flags & CM_SCACHEFLAG_WAITING) {
+        osi_Log1(afsd_logp, "CM BkgPrefetch Waking scp 0x%p", scp);
+        osi_Wakeup((LONG_PTR) &scp->flags);
+    }
     lock_ReleaseWrite(&scp->rw);
 
-    osi_Log4(afsd_logp, "Ending BKG prefetch scp 0x%p, code %d bytes 0x%x:%x", 
-              scp, code, fetched.HighPart, fetched.LowPart);
+    osi_Log4(afsd_logp, "Ending BKG prefetch scp 0x%p code 0x%x fetched 0x%x:%x",
+             scp, code, fetched.HighPart, fetched.LowPart);
     return code;
 }
 
@@ -782,9 +805,16 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun
                          cm_user_t *userp, cm_req_t *reqp)
 {
     long code;
+    int  rwheld = 0;
     osi_hyper_t realBase;
     osi_hyper_t readBase;
     osi_hyper_t readLength;
+    osi_hyper_t readEnd;
+    osi_hyper_t offset;
+    osi_hyper_t tblocksize;            /* a long long temp variable */
+    cm_buf_t    *bp;
+
+    tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
         
     readBase = *offsetp;
     /* round up to chunk boundary */
@@ -794,6 +824,7 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun
     readLength = ConvertLongToLargeInteger(count);
 
     lock_ObtainWrite(&scp->rw);
+    rwheld = 1;
     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
         lock_ReleaseWrite(&scp->rw);
@@ -807,12 +838,44 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun
     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
         readBase = scp->prefetch.end;
 
-    lock_ReleaseWrite(&scp->rw);
-
     code = cm_CheckFetchRange(scp, &readBase, &readLength, userp, reqp,
                               &realBase);
-    if (code) 
+    if (code) {
+        scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
+        lock_ReleaseWrite(&scp->rw);
         return;        /* can't find something to prefetch */
+    }
+
+    readEnd = LargeIntegerAdd(realBase, readLength);
+
+    /*
+     * Mark each buffer in the range as queued for a
+     * background fetch
+     */
+    for ( offset = realBase;
+          LargeIntegerLessThan(offset, readEnd);
+          offset = LargeIntegerAdd(offset, tblocksize) )
+    {
+        if (rwheld) {
+            lock_ReleaseWrite(&scp->rw);
+            rwheld = 0;
+        }
+
+        bp = buf_Find(scp, &offset);
+        if (!bp)
+            continue;
+
+        if (!rwheld) {
+            lock_ObtainWrite(&scp->rw);
+            rwheld = 1;
+        }
+
+        bp->cmFlags |= CM_BUF_CMBKGFETCH;
+        buf_Release(bp);
+    }
+
+    if (rwheld)
+        lock_ReleaseWrite(&scp->rw);
 
     osi_Log2(afsd_logp, "BKG Prefetch request scp 0x%p, base 0x%x",
              scp, realBase.LowPart);
index 2d7e058..cb3e747 100644 (file)
@@ -31,9 +31,9 @@ extern int cm_HaveBuffer(struct cm_scache *, struct cm_buf *, int haveBufLocked)
 extern long cm_GetBuffer(struct cm_scache *, struct cm_buf *, int *,
        struct cm_user *, struct cm_req *);
 
-extern long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep,
-                               osi_hyper_t *length, cm_user_t *up, 
-                               cm_req_t *reqp, osi_hyper_t *realBasep);
+extern afs_int32 cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep,
+                                    osi_hyper_t *length, cm_user_t *up,
+                                    cm_req_t *reqp, osi_hyper_t *realBasep);
 
 extern long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp,
        cm_bulkIO_t *biop, cm_user_t *up, cm_req_t *reqp);