Windows: wait for I/O on buffers to complete in cm_SetupStoreBIOD
[openafs.git] / src / WINNT / afsd / cm_dcache.c
index c3b53d9..ba311d5 100644 (file)
@@ -79,6 +79,8 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
     osi_assertx(userp != NULL, "null cm_user_t");
     osi_assertx(scp != NULL, "null cm_scache_t");
 
+    memset(&volSync, 0, sizeof(volSync));
+
     /* now, the buffer may or may not be filled with good data (buf_GetNew
      * drops lots of locks, and may indeed return a properly initialized
      * buffer, although more likely it will just return a new, empty, buffer.
@@ -347,6 +349,8 @@ long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
     int require_64bit_ops = 0;
     int call_was_64bit = 0;
 
+    memset(&volSync, 0, sizeof(volSync));
+
     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
     (void) cm_SyncOp(scp, NULL, userp, reqp, 0,
                      CM_SCACHESYNC_STOREDATA_EXCL);
@@ -453,7 +457,7 @@ long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
 
 long cm_BufRead(cm_buf_t *bufp, long nbytes, long *bytesReadp, cm_user_t *userp)
 {
-    *bytesReadp = cm_data.buf_blockSize;
+    *bytesReadp = 0;
 
     /* now return a code that means that I/O is done */
     return 0;
@@ -531,6 +535,8 @@ int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
         return 1;
     if (bufp->dataVersion <= scp->dataVersion && bufp->dataVersion >= scp->bufDataVersionLow)
         return 1;
+    if (bufp->offset.QuadPart >= scp->serverLength.QuadPart)
+        return 1;
     if (!isBufLocked) {
         code = lock_TryMutex(&bufp->mx);
         if (code == 0) {
@@ -555,8 +561,12 @@ int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
         return 0;
 }
 
-/* used when deciding whether to do a prefetch or not */
-long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *length,
+/*
+ * used when deciding whether to do a background fetch or not.
+ * call with scp->rw write-locked.
+ */
+afs_int32
+cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *length,
                         cm_user_t *userp, cm_req_t *reqp, osi_hyper_t *realBasep)
 {
     osi_hyper_t tbase;
@@ -573,16 +583,12 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
     tlength = *length;
     tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
     stop = 0;
-    lock_ObtainWrite(&scp->rw);
     while (LargeIntegerGreaterThanZero(tlength)) {
         /* get callback so we can do a meaningful dataVersion comparison */
         code = cm_SyncOp(scp, NULL, userp, reqp, 0,
                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
-        if (code) {
-            scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
-            lock_ReleaseWrite(&scp->rw);
+        if (code)
             return code;
-        }
                 
         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
             /* we're past the end of file */
@@ -592,7 +598,7 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
         bp = buf_Find(scp, &tbase);
         /* We cheat slightly by not locking the bp mutex. */
         if (bp) {
-            if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0
+            if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING | CM_BUF_CMBKGFETCH)) == 0
                  && (bp->dataVersion < scp->bufDataVersionLow || bp->dataVersion > scp->dataVersion))
                 stop = 1;
             buf_Release(bp);
@@ -616,7 +622,6 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
      */
     if (stop == 0) {
         /* return non-zero code since realBasep won't be valid */
-        scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
         code = -1;
     }   
     else {
@@ -624,7 +629,6 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *
         *realBasep = tbase;
         code = 0;
     }
-    lock_ReleaseWrite(&scp->rw);
     return code;
 }
 
@@ -705,8 +709,8 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af
     osi_hyper_t end;
     osi_hyper_t fetched;
     osi_hyper_t tblocksize;
-    long code;
-    int mxheld = 0;
+    afs_int32 code;
+    int rxheld = 0;
     cm_buf_t *bp = NULL;
     cm_req_t req;
 
@@ -725,15 +729,16 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af
 
     end = LargeIntegerAdd(base, length);
         
-    osi_Log3(afsd_logp, "Starting BKG prefetch scp 0x%p, base 0x%x:%x", scp, p2, p1);
+    osi_Log5(afsd_logp, "Starting BKG prefetch scp 0x%p offset 0x%x:%x length 0x%x:%x",
+             scp, p2, p1, p4, p3);
 
     for ( code = 0, offset = base;
           code == 0 && LargeIntegerLessThan(offset, end); 
           offset = LargeIntegerAdd(offset, tblocksize) )
     {
-        if (mxheld) {
+        if (rxheld) {
             lock_ReleaseWrite(&scp->rw);
-            mxheld = 0;
+            rxheld = 0;
         }
 
         code = buf_Get(scp, &offset, &req, &bp);
@@ -742,32 +747,56 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af
 
         if (bp->cmFlags & CM_BUF_CMFETCHING) {
             /* skip this buffer as another thread is already fetching it */
+            if (!rxheld) {
+                lock_ObtainWrite(&scp->rw);
+                rxheld = 1;
+            }
+            bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
             buf_Release(bp);
             bp = NULL;
             continue;
         }
 
-        if (!mxheld) {
+        if (!rxheld) {
             lock_ObtainWrite(&scp->rw);
-            mxheld = 1;
+            rxheld = 1;
         }
 
         code = cm_GetBuffer(scp, bp, NULL, userp, &req);
         if (code == 0)
             fetched = LargeIntegerAdd(fetched, tblocksize); 
         buf_Release(bp);
+        bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
     }
     
-    if (!mxheld) {
+    if (!rxheld) {
         lock_ObtainWrite(&scp->rw);
-        mxheld = 1;
+        rxheld = 1;
+    }
+
+    /* Clear flag from any remaining buffers */
+    for ( ;
+          LargeIntegerLessThan(offset, end);
+          offset = LargeIntegerAdd(offset, tblocksize) )
+    {
+        bp = buf_Find(scp, &offset);
+        if (bp) {
+            bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
+            buf_Release(bp);
+        }
     }
     cm_ClearPrefetchFlag(LargeIntegerGreaterThanZero(fetched) ? 0 : code, 
                          scp, &base, &fetched);
+
+    /* wakeup anyone who is waiting */
+    if (scp->flags & CM_SCACHEFLAG_WAITING) {
+        osi_Log1(afsd_logp, "CM BkgPrefetch Waking scp 0x%p", scp);
+        osi_Wakeup((LONG_PTR) &scp->flags);
+    }
     lock_ReleaseWrite(&scp->rw);
 
-    osi_Log4(afsd_logp, "Ending BKG prefetch scp 0x%p, code %d bytes 0x%x:%x", 
-              scp, code, fetched.HighPart, fetched.LowPart);
+    osi_Log4(afsd_logp, "Ending BKG prefetch scp 0x%p code 0x%x fetched 0x%x:%x",
+             scp, code, fetched.HighPart, fetched.LowPart);
     return code;
 }
 
@@ -778,9 +807,16 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun
                          cm_user_t *userp, cm_req_t *reqp)
 {
     long code;
+    int  rwheld = 0;
     osi_hyper_t realBase;
     osi_hyper_t readBase;
     osi_hyper_t readLength;
+    osi_hyper_t readEnd;
+    osi_hyper_t offset;
+    osi_hyper_t tblocksize;            /* a long long temp variable */
+    cm_buf_t    *bp;
+
+    tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
         
     readBase = *offsetp;
     /* round up to chunk boundary */
@@ -790,6 +826,7 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun
     readLength = ConvertLongToLargeInteger(count);
 
     lock_ObtainWrite(&scp->rw);
+    rwheld = 1;
     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
         lock_ReleaseWrite(&scp->rw);
@@ -803,12 +840,44 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun
     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
         readBase = scp->prefetch.end;
 
-    lock_ReleaseWrite(&scp->rw);
-
     code = cm_CheckFetchRange(scp, &readBase, &readLength, userp, reqp,
                               &realBase);
-    if (code) 
+    if (code) {
+        scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
+        lock_ReleaseWrite(&scp->rw);
         return;        /* can't find something to prefetch */
+    }
+
+    readEnd = LargeIntegerAdd(realBase, readLength);
+
+    /*
+     * Mark each buffer in the range as queued for a
+     * background fetch
+     */
+    for ( offset = realBase;
+          LargeIntegerLessThan(offset, readEnd);
+          offset = LargeIntegerAdd(offset, tblocksize) )
+    {
+        if (rwheld) {
+            lock_ReleaseWrite(&scp->rw);
+            rwheld = 0;
+        }
+
+        bp = buf_Find(scp, &offset);
+        if (!bp)
+            continue;
+
+        if (!rwheld) {
+            lock_ObtainWrite(&scp->rw);
+            rwheld = 1;
+        }
+
+        bp->cmFlags |= CM_BUF_CMBKGFETCH;
+        buf_Release(bp);
+    }
+
+    if (rwheld)
+        lock_ReleaseWrite(&scp->rw);
 
     osi_Log2(afsd_logp, "BKG Prefetch request scp 0x%p, base 0x%x",
              scp, realBase.LowPart);
@@ -867,8 +936,15 @@ long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, long inSize,
             /* get buffer mutex and scp mutex safely */
             lock_ReleaseWrite(&scp->rw);
             lock_ObtainMutex(&bufp->mx);
-            lock_ObtainWrite(&scp->rw);
 
+            /*
+             * if the buffer is actively involved in I/O
+             * we wait for the I/O to complete.
+             */
+            if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
+                buf_WaitIO(scp, bufp);
+
+            lock_ObtainWrite(&scp->rw);
             flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags); 
             if (code) {
@@ -1383,6 +1459,59 @@ void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore, long code, int scp_locked)
     biop->bufListEndp = NULL;
 }   
 
+static int
+cm_CloneStatus(cm_scache_t *scp, cm_user_t *userp, int scp_locked,
+               AFSFetchStatus *afsStatusp, AFSVolSync *volSyncp)
+{
+    // setup the status based upon the scp data
+    afsStatusp->InterfaceVersion = 0x1;
+    switch (scp->fileType) {
+    case CM_SCACHETYPE_FILE:
+        afsStatusp->FileType = File;
+        break;
+    case CM_SCACHETYPE_DIRECTORY:
+        afsStatusp->FileType = Directory;
+        break;
+    case CM_SCACHETYPE_MOUNTPOINT:
+        afsStatusp->FileType = SymbolicLink;
+        break;
+    case CM_SCACHETYPE_SYMLINK:
+    case CM_SCACHETYPE_DFSLINK:
+        afsStatusp->FileType = SymbolicLink;
+        break;
+    default:
+        afsStatusp->FileType = -1;    /* an invalid value */
+    }
+    afsStatusp->LinkCount = scp->linkCount;
+    afsStatusp->Length = scp->length.LowPart;
+    afsStatusp->DataVersion = (afs_uint32)(scp->dataVersion & MAX_AFS_UINT32);
+    afsStatusp->Author = 0x1;
+    afsStatusp->Owner = scp->owner;
+    if (!scp_locked) {
+        lock_ObtainWrite(&scp->rw);
+        scp_locked = 1;
+    }
+    if (cm_FindACLCache(scp, userp, &afsStatusp->CallerAccess))
+        afsStatusp->CallerAccess = scp->anyAccess;
+    afsStatusp->AnonymousAccess = scp->anyAccess;
+    afsStatusp->UnixModeBits = scp->unixModeBits;
+    afsStatusp->ParentVnode = scp->parentVnode;
+    afsStatusp->ParentUnique = scp->parentUnique;
+    afsStatusp->ResidencyMask = 0;
+    afsStatusp->ClientModTime = scp->clientModTime;
+    afsStatusp->ServerModTime = scp->serverModTime;
+    afsStatusp->Group = scp->group;
+    afsStatusp->SyncCounter = 0;
+    afsStatusp->dataVersionHigh = (afs_uint32)(scp->dataVersion >> 32);
+    afsStatusp->lockCount = 0;
+    afsStatusp->Length_hi = scp->length.HighPart;
+    afsStatusp->errorCode = 0;
+
+    volSyncp->spare1 = scp->volumeCreationDate;
+
+    return scp_locked;
+}
+
 /* Fetch a buffer.  Called with scp locked.
  * The scp is locked on return.
  */
@@ -1414,6 +1543,8 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
     int first_read = 1;
     int scp_locked = 1;
 
+    memset(&volSync, 0, sizeof(volSync));
+
     /* now, the buffer may or may not be filled with good data (buf_GetNew
      * drops lots of locks, and may indeed return a properly initialized
      * buffer, although more likely it will just return a new, empty, buffer.
@@ -1515,7 +1646,8 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
         afsStatus.lockCount = 0;
         afsStatus.Length_hi = 0;
         afsStatus.errorCode = 0;
-       
+       memset(&volSync, 0, sizeof(volSync));
+
         // once we're done setting up the status info,
         // we just fill the buffer pages with fakedata
         // from cm_FakeRootDir. Extra pages are set to
@@ -1564,47 +1696,8 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
                  scp, biod.offset.HighPart, biod.offset.LowPart,
                  scp->length.HighPart, scp->length.LowPart);
 
-        // setup the status based upon the scp data
-        afsStatus.InterfaceVersion = 0x1;
-        switch (scp->fileType) {
-        case CM_SCACHETYPE_FILE:
-            afsStatus.FileType = File;
-            break;
-        case CM_SCACHETYPE_DIRECTORY:
-            afsStatus.FileType = Directory;
-            break;
-        case CM_SCACHETYPE_MOUNTPOINT:
-            afsStatus.FileType = SymbolicLink;
-            break;
-        case CM_SCACHETYPE_SYMLINK:
-        case CM_SCACHETYPE_DFSLINK:
-            afsStatus.FileType = SymbolicLink;
-            break;
-        default:
-            afsStatus.FileType = -1;    /* an invalid value */
-        }
-        afsStatus.LinkCount = scp->linkCount;
-        afsStatus.Length = scp->length.LowPart;
-        afsStatus.DataVersion = (afs_uint32)(scp->dataVersion & MAX_AFS_UINT32);
-        afsStatus.Author = 0x1;
-        afsStatus.Owner = scp->owner;
-        lock_ObtainWrite(&scp->rw);
-        scp_locked = 1;
-        if (cm_FindACLCache(scp, userp, &afsStatus.CallerAccess))
-             afsStatus.CallerAccess = scp->anyAccess;
-        afsStatus.AnonymousAccess = scp->anyAccess;
-        afsStatus.UnixModeBits = scp->unixModeBits;
-        afsStatus.ParentVnode = scp->parentVnode;
-        afsStatus.ParentUnique = scp->parentUnique;
-        afsStatus.ResidencyMask = 0;
-        afsStatus.ClientModTime = scp->clientModTime;
-        afsStatus.ServerModTime = scp->serverModTime;
-        afsStatus.Group = scp->group;
-        afsStatus.SyncCounter = 0;
-        afsStatus.dataVersionHigh = (afs_uint32)(scp->dataVersion >> 32);
-        afsStatus.lockCount = 0;
-        afsStatus.Length_hi = scp->length.HighPart;
-        afsStatus.errorCode = 0;
+        /* Clone the current status info */
+        scp_locked = cm_CloneStatus(scp, userp, scp_locked, &afsStatus, &volSync);
 
         /* status info complete, fill pages with zeros */
         for (qdp = biod.bufListEndp;
@@ -1619,6 +1712,11 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
         goto fetchingcompleted;
     }
 
+    if (scp_locked) {
+        lock_ReleaseWrite(&scp->rw);
+        scp_locked = 0;
+    }
+
     /* now make the call */
     do {
         code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
@@ -1850,8 +1948,17 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
         if (code1 == RXKADUNKNOWNKEY)
             osi_Log0(afsd_logp, "CALL EndCall returns RXKADUNKNOWNKEY");
 
+        /* If we are avoiding a file server bug, ignore the error state */
+        if (fs_fetchdata_offset_bug && first_read && length_found == 0 && code == -451) {
+            /* Clone the current status info and clear the error state */
+            scp_locked = cm_CloneStatus(scp, userp, scp_locked, &afsStatus, &volSync);
+            if (scp_locked) {
+                lock_ReleaseWrite(&scp->rw);
+                scp_locked = 0;
+            }
+            code = 0;
         /* Prefer the error value from FetchData over rx_EndCall */
-        if (code == 0 && code1 != 0)
+        } else if (code == 0 && code1 != 0)
             code = code1;
         osi_Log0(afsd_logp, "CALL FetchData DONE");