Windows: Direct IO Support for Service
authorJeffrey Altman <jaltman@your-file-system.com>
Sun, 16 Dec 2012 17:42:17 +0000 (12:42 -0500)
committerJeffrey Altman <jaltman@your-file-system.com>
Sat, 23 Feb 2013 08:22:04 +0000 (00:22 -0800)
This patchset implements and enables by default the new
Direct IO pathway between the AFS redirector and the afsd_service.exe.
When Direct IO is enabled all reads and writes are performed by the
AFS redirector locking memory allocated by the kernel and mapping it into
the service's memory address space.

The service supports cache bypass in this mode when the
AFS_REQUEST_FLAG_CACHE_BYPASS flag is set in the request from the
redirector.  When cache bypass is active, the AFSCache file is ignored and
data is either directly fetched from or stored to the file server.  Cache
bypass is enabled by IIS and other applications that request no
intermediate buffering when opening file handles.   This is often done
because the application implements its own data caching.  All cache bypass
store operations are synchronous.

When cache bypass is not enabled, the memory region provided by the AFS
redirector is either used to populate the cm_buf_t objects or is populated
by them.  When cache bypass is not enabled, one outstanding store
operation can be in flight asynchronously to improve performance.

Direct IO is enabled by default and can be disabled by creating the
registry value.

  ..\Services\TransarcAFSDaemon\Parameters
  "DirectIO"  DWORD  0x0

Change-Id: I3cac3660c8b8eded58226ba4a819692c454704a8
Reviewed-on: http://gerrit.openafs.org/9211
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Jeffrey Altman <jaltman@your-file-system.com>

doc/xml/ReleaseNotesWindows/relnotes.xml
src/WINNT/afsd/NTMakefile
src/WINNT/afsd/afsd.h
src/WINNT/afsd/afsd_init.c
src/WINNT/afsd/cm_daemon.c
src/WINNT/afsd/cm_dcache.c
src/WINNT/afsd/cm_direct.c [new file with mode: 0644]
src/WINNT/afsd/cm_direct.h [new file with mode: 0644]
src/WINNT/afsrdr/user/RDRFunction.c
src/WINNT/afsrdr/user/RDRInit.cpp
src/WINNT/afsrdr/user/RDRPrototypes.h

index 1f8f378..d3cce35 100644 (file)
@@ -4303,6 +4303,25 @@ Default: 0</para>
           <para>0: do not generate 8.3 compatible short names.</para>
           <para>1: generate 8.3 compatible short names.</para>
         </section>
+        <section>
+          <title id="Regkey_TransarcAFSDaemon_Parameters_DirectIO">Value: DirectIO</title>
+          <indexterm significance="normal">
+            <primary>DirectIO</primary>
+          </indexterm>
+          <para>Regkey: [HKLM\SYSTEM\CurrentControlSet\Services\TransarcAFSDaemon\Parameters]</para>
+          <para>Type: DWORD {0, 1}
+          </para>
+          <para>
+            Default: 1</para>
+          <para>The AFS redirector as of 1.7.22 supports two I/O processing mechanisms. The new
+            implementation is referred to as Direct I/O.  Direct I/O provides two benefits over the
+            prior implementation.  First, it is faster.  Second, it provides support for
+            CreateFile(FILE_FLAG_NO_BUFFERING).  When a file is opened with the
+            FILE_FLAG_NO_BUFFERING flag set, the AFSCache is bypassed and all I/O operations on the
+            file are performed directly to and from the file server.</para>
+          <para>0: use the older I/O processing mechanism.</para>
+          <para>1: use the new Direct I/O processing mechanism.</para>
+        </section>
       </section>
       <section>
         <title id="Regkey_TransarcAFSDaemon_Parameters_GlobalAutoMapper">Regkey:
index 21c5e6e..a234022 100644 (file)
@@ -129,6 +129,7 @@ AFSDOBJS=\
        $(OUT)\cm_buf.obj \
        $(OUT)\cm_scache.obj \
        $(OUT)\cm_dcache.obj \
+        $(OUT)\cm_direct.obj \
        $(OUT)\cm_access.obj \
        $(OUT)\cm_eacces.obj \
        $(OUT)\cm_callback.obj \
index 695973a..f583124 100644 (file)
@@ -47,6 +47,7 @@ BOOL APIENTRY About(HWND, unsigned int, unsigned int, long);
 #include "cm_volstat.h"
 #include "cm_volume.h"
 #include "cm_dcache.h"
+#include "cm_direct.h"
 #include "cm_access.h"
 #include "cm_eacces.h"
 #include "cm_dir.h"
@@ -62,6 +63,7 @@ BOOL APIENTRY About(HWND, unsigned int, unsigned int, long);
 #include "cm_freelance.h"
 #include "cm_performance.h"
 #include "cm_rdr.h"
+#include "rawops.h"
 #include "afsd_init.h"
 #include "afsd_eventlog.h"
 
@@ -115,6 +117,7 @@ extern int cm_fakeGettingCallback;                  // 1 if currently updating the fake root.af
 extern int cm_dnsEnabled;
 extern int cm_readonlyVolumeVersioning;
 extern int cm_shortNames;
+extern int cm_directIO;
 
 extern long rx_mtu;
 
index 4d523b6..c966486 100644 (file)
@@ -78,6 +78,7 @@ int cm_chunkSize;
 int cm_virtualCache = 0;
 afs_int32 cm_verifyData = 0;
 int cm_shortNames = 1;
+int cm_directIO = 1;
 
 int smb_UseV3 = 1;
 afs_uint32 smb_Enabled = 1;
@@ -1386,6 +1387,16 @@ afsd_InitCM(char **reasonP)
     }
     afsi_log("CM ShortNames is %u", cm_shortNames);
 
+    dummyLen = sizeof(DWORD);
+    code = RegQueryValueEx(parmKey, "DirectIO", NULL, NULL,
+                           (BYTE *) &dwValue, &dummyLen);
+    if (code == ERROR_SUCCESS) {
+        cm_directIO = (unsigned short) dwValue;
+    } else {
+        cm_directIO = 1;
+    }
+    afsi_log("CM DirectIO is %u", cm_directIO);
+
     RegCloseKey (parmKey);
 
     cacheBlocks = ((afs_uint64)cacheSize * 1024) / blockSize;
index c18cee5..4cf4a61 100644 (file)
@@ -214,6 +214,9 @@ void * cm_BkgDaemon(void * vparm)
         if (rp->scp->flags & CM_SCACHEFLAG_DELETED) {
             osi_Log2(afsd_logp,"cm_BkgDaemon[%u] DELETED scp 0x%x", daemonID, rp->scp);
             code = CM_ERROR_BADFD;
+            if (rp->procp == cm_BkgDirectWrite) {
+                cm_BkgDirectWriteDone(rp->scp, rp->rockp, code);
+            }
         } else {
 #ifdef DEBUG_REFCOUNT
             osi_Log3(afsd_logp,"cm_BkgDaemon[%u] (before) scp 0x%x ref %d", daemonID, rp->scp, rp->scp->refCount);
@@ -239,6 +242,7 @@ void * cm_BkgDaemon(void * vparm)
         case CM_ERROR_ALLOFFLINE:
         case CM_ERROR_PARTIALWRITE:
             if (rp->procp == cm_BkgStore ||
+                rp->procp == cm_BkgDirectWrite ||
                 rp->procp == RDR_BkgFetch) {
                 osi_Log3(afsd_logp,
                          "cm_BkgDaemon[%u] re-queueing failed request 0x%p code 0x%x",
@@ -292,7 +296,8 @@ int cm_QueueBKGRequest(cm_scache_t *scp, cm_bkgProc_t *procp, void *rockp,
 
     /* Use separate queues for fetch and store operations */
     daemonID = scp->fid.hash % (cm_nDaemons/2) * 2;
-    if (procp == cm_BkgStore)
+    if (procp == cm_BkgStore ||
+        procp == cm_BkgDirectWrite)
         daemonID++;
 
     /* Check to see if this is a duplicate request */
index aff45e5..5ced48f 100644 (file)
@@ -2818,6 +2818,8 @@ cm_VerifyStoreData(cm_bulkIO_t *biod, cm_scache_t *savedScp)
                     buf_offset = 0;
                 }
                 cmp_length =  cm_data.buf_blockSize - buf_offset;
+                if (cmp_length > biod->length - bytes_compared)
+                    cmp_length = biod->length - bytes_compared;
 
                 osi_assertx(qdp != NULL, "null osi_queueData_t");
                 bufp = osi_GetQData(qdp);
diff --git a/src/WINNT/afsd/cm_direct.c b/src/WINNT/afsd/cm_direct.c
new file mode 100644 (file)
index 0000000..62b37c0
--- /dev/null
@@ -0,0 +1,575 @@
+/*
+ * Copyright (c) 2012 Your File System, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright
+ *   notice, this list of conditions and the following disclaimer in the
+ *   documentation and/or other materials provided with the distribution.
+ * - Neither the name of Your File System, Inc nor the names of its
+ *   contributors may be used to endorse or promote products derived
+ *   from this software without specific prior written permission from
+ *   Your File System, Inc.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <afsconfig.h>
+#include <afs/param.h>
+#include <roken.h>
+
+#include <afs/stds.h>
+
+#include <windows.h>
+#include <winsock2.h>
+#include <nb30.h>
+#include <string.h>
+#include <stdlib.h>
+#include <osi.h>
+
+#include "afsd.h"
+
+/*
+ * cm_DirectWrite is used to write the contents of one contiguous
+ * buffer to the file server.  The input buffer must not be a
+ * cm_buf_t.data field.  The data is written to the file server without
+ * locking any buffers.  The cm_scache object is protected
+ * by cm_SyncOp( CM_SCACHESYNC_STOREDATA_EXCL) and the resulting
+ * AFSFetchStatus is merged.
+ */
+
+static afs_int32
+int_DirectWrite( IN cm_scache_t *scp,
+                 IN cm_bulkIO_t *biodp,
+                 IN osi_hyper_t *offsetp,
+                 IN afs_uint32   length,
+                 IN afs_uint32   flags,
+                 IN cm_user_t   *userp,
+                 IN cm_req_t    *reqp,
+                 IN void        *memoryRegionp,
+                 OUT afs_uint32 *bytesWritten)
+{
+    long code, code1;
+    long temp;
+    AFSFetchStatus outStatus;
+    AFSStoreStatus inStatus;
+    AFSVolSync volSync;
+    AFSFid tfid;
+    struct rx_call *rxcallp;
+    struct rx_connection *rxconnp;
+    cm_conn_t *connp;
+    osi_hyper_t truncPos;
+    int require_64bit_ops = 0;
+    int call_was_64bit = 0;
+    int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
+    afs_uint32 written = 0;
+
+    osi_assertx(userp != NULL, "null cm_user_t");
+    osi_assertx(biodp != NULL, "null cm_bulkIO_t");
+    osi_assertx(biodp->scp == scp, "cm_bulkIO_t.scp != scp");
+
+    memset(&volSync, 0, sizeof(volSync));
+    if (bytesWritten)
+        *bytesWritten = 0;
+
+    cm_AFSFidFromFid(&tfid, &scp->fid);
+
+    if (!scp_locked)
+        lock_ObtainWrite(&scp->rw);
+
+    /* prepare the output status for the store */
+    _InterlockedOr(&scp->mask, CM_SCACHEMASK_CLIENTMODTIME);
+    cm_StatusFromAttr(&inStatus, scp, NULL);
+    truncPos = scp->length;
+    if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
+         && LargeIntegerLessThan(scp->truncPos, truncPos)) {
+        truncPos = scp->truncPos;
+        _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_TRUNCPOS);
+    }
+
+    InterlockedIncrement(&scp->activeRPCs);
+    lock_ReleaseWrite(&scp->rw);
+
+    /* now we're ready to do the store operation */
+    do {
+        code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
+        if (code)
+            continue;
+
+    retry:
+        rxconnp = cm_GetRxConn(connp);
+        rxcallp = rx_NewCall(rxconnp);
+        rx_PutConnection(rxconnp);
+
+        if (SERVERHAS64BIT(connp)) {
+            call_was_64bit = 1;
+
+            osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
+                     scp, offsetp->HighPart, offsetp->LowPart, length);
+            osi_Log2(afsd_logp, "... truncPos 0x%x:%08x",  truncPos.HighPart, truncPos.LowPart);
+
+            code = StartRXAFS_StoreData64(rxcallp, &tfid, &inStatus,
+                                          offsetp->QuadPart,
+                                          length,
+                                          truncPos.QuadPart);
+           if (code)
+               osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData64 FAILURE, code 0x%x", code);
+           else
+               osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData64 SUCCESS");
+        } else {
+            call_was_64bit = 0;
+
+            if (require_64bit_ops) {
+                osi_Log0(afsd_logp, "Skipping StartRXAFS_StoreData.  The operation requires large file support in the server.");
+                code = CM_ERROR_TOOBIG;
+            } else {
+                osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
+                         scp, offsetp->HighPart, offsetp->LowPart, length);
+                osi_Log1(afsd_logp, "... truncPos 0x%08x",  truncPos.LowPart);
+
+                code = StartRXAFS_StoreData(rxcallp, &tfid, &inStatus,
+                                            offsetp->LowPart, length, truncPos.LowPart);
+               if (code)
+                   osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
+               else
+                   osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
+            }
+        }
+
+        /* Write the data */
+        if (code == 0) {
+            temp = rx_Write(rxcallp, memoryRegionp, length);
+            if (temp != length) {
+                osi_Log2(afsd_logp, "rx_Write failed %d != %d", temp, length);
+                code = (rx_Error(rxcallp) < 0) ? rx_Error(rxcallp) : RX_PROTOCOL_ERROR;
+                break;
+            } else {
+                osi_Log1(afsd_logp, "rx_Write succeeded written %d", temp);
+                written += temp;
+            }
+        }
+
+        /* End the call */
+        if (code == 0) {
+            if (call_was_64bit) {
+                code = EndRXAFS_StoreData64(rxcallp, &outStatus, &volSync);
+                if (code)
+                    osi_Log2(afsd_logp, "EndRXAFS_StoreData64 FAILURE scp 0x%p code %lX", scp, code);
+               else
+                   osi_Log0(afsd_logp, "EndRXAFS_StoreData64 SUCCESS");
+            } else {
+                code = EndRXAFS_StoreData(rxcallp, &outStatus, &volSync);
+                if (code)
+                    osi_Log2(afsd_logp, "EndRXAFS_StoreData FAILURE scp 0x%p code %lX",scp,code);
+               else
+                   osi_Log0(afsd_logp, "EndRXAFS_StoreData SUCCESS");
+            }
+        }
+
+        code1 = rx_EndCall(rxcallp, code);
+
+        if ((code == RXGEN_OPCODE || code1 == RXGEN_OPCODE) && SERVERHAS64BIT(connp)) {
+            SET_SERVERHASNO64BIT(connp);
+            goto retry;
+        }
+
+        /* Prefer StoreData error over rx_EndCall error */
+        if (code1 != 0)
+            code = code1;
+    } while (cm_Analyze(connp, userp, reqp, &scp->fid, NULL, 1, &outStatus, &volSync, NULL, NULL, code));
+
+    code = cm_MapRPCError(code, reqp);
+
+    if (code)
+        osi_Log2(afsd_logp, "CALL StoreData FAILURE scp 0x%p, code 0x%x", scp, code);
+    else
+        osi_Log1(afsd_logp, "CALL StoreData SUCCESS scp 0x%p", scp);
+
+    /* now, clean up our state */
+    lock_ObtainWrite(&scp->rw);
+
+    if (code == 0) {
+        osi_hyper_t t;
+
+        /* now, here's something a little tricky: in AFS 3, a dirty
+         * length can't be directly stored, instead, a dirty chunk is
+         * stored that sets the file's size (by writing and by using
+         * the truncate-first option in the store call).
+         *
+         * At this point, we've just finished a store, and so the trunc
+         * pos field is clean.  If the file's size at the server is at
+         * least as big as we think it should be, then we turn off the
+         * length dirty bit, since all the other dirty buffers must
+         * precede this one in the file.
+         *
+         * The file's desired size shouldn't be smaller than what's
+         * stored at the server now, since we just did the trunc pos
+         * store.
+         *
+         * We have to turn off the length dirty bit as soon as we can,
+         * so that we see updates made by other machines.
+         */
+
+        if (call_was_64bit) {
+            t.LowPart = outStatus.Length;
+            t.HighPart = outStatus.Length_hi;
+        } else {
+            t = ConvertLongToLargeInteger(outStatus.Length);
+        }
+
+        if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
+            _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_LENGTH);
+
+        cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp,
+                       CM_MERGEFLAG_STOREDATA | CM_MERGEFLAG_CACHE_BYPASS);
+    } else {
+        InterlockedDecrement(&scp->activeRPCs);
+        if (code == CM_ERROR_SPACE)
+            _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OUTOFSPACE);
+        else if (code == CM_ERROR_QUOTA)
+            _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OVERQUOTA);
+    }
+    cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
+
+    if (bytesWritten)
+        *bytesWritten = written;
+
+    if (!scp_locked)
+        lock_ReleaseWrite(&scp->rw);
+
+    return code;
+}
+
+afs_int32
+cm_DirectWrite( IN cm_scache_t *scp,
+                IN osi_hyper_t *offsetp,
+                IN afs_uint32   length,
+                IN afs_uint32   flags,
+                IN cm_user_t   *userp,
+                IN cm_req_t    *reqp,
+                IN void        *memoryRegionp,
+                OUT afs_uint32 *bytesWritten)
+{
+    rock_BkgDirectWrite_t *rockp = NULL;
+    int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
+    afs_int32 code;
+
+    if (!scp_locked)
+        lock_ObtainWrite(&scp->rw);
+
+    if (scp->flags & CM_SCACHEFLAG_DELETED) {
+        if (!scp_locked)
+            lock_ReleaseWrite(&scp->rw);
+        return CM_ERROR_BADFD;
+    }
+
+    rockp = malloc(sizeof(*rockp));
+    if (!rockp) {
+        if (!scp_locked)
+            lock_ReleaseWrite(&scp->rw);
+        return ENOMEM;
+    }
+
+    rockp->memoryRegion = malloc(length);
+    if (rockp->memoryRegion == NULL) {
+        if (!scp_locked)
+            lock_ReleaseWrite(&scp->rw);
+        free(rockp);
+        return ENOMEM;
+    }
+
+    /* Serialize StoreData RPC's; for rationale see cm_scache.c */
+    code = cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
+    if (code) {
+        if (!scp_locked)
+            lock_ReleaseWrite(&scp->rw);
+        free(rockp->memoryRegion);
+        free(rockp);
+        return ENOMEM;
+    }
+
+    /* cannot hold scp->rw when calling cm_QueueBkGRequest. */
+    lock_ReleaseWrite(&scp->rw);
+    memcpy(rockp->memoryRegion, memoryRegionp, length);
+    rockp->offset = *offsetp;
+    rockp->length = length;
+    rockp->bypass_cache = TRUE;
+
+    cm_QueueBKGRequest(scp, cm_BkgDirectWrite, rockp, userp, reqp);
+
+    *bytesWritten = length;     /* must lie */
+    if (scp_locked)
+        lock_ObtainWrite(&scp->rw);
+
+    return code;
+}
+
+void
+cm_BkgDirectWriteDone( cm_scache_t *scp, void *vrockp, afs_int32 code)
+{
+    rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
+
+    lock_ObtainWrite(&scp->rw);
+    cm_ReleaseBIOD(&rockp->biod, 1, code, 1);
+    cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
+    lock_ReleaseWrite(&scp->rw);
+    free(rockp->memoryRegion);
+    rockp->memoryRegion = NULL;
+}
+
+afs_int32
+cm_BkgDirectWrite( cm_scache_t *scp, void *vrockp, struct cm_user *userp, cm_req_t *reqp)
+{
+    rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
+    afs_uint32 flags = 0;
+    afs_uint32 bytesWritten;
+    afs_int32  code;
+
+    /*
+     * Must fixup biod->reqp value since we are no longer running with the
+     * same stack as when the BIOD was created.
+     */
+    rockp->biod.reqp = reqp;
+
+    osi_assertx(rockp->memoryRegion, "memoryRegion is NULL");
+
+    code = int_DirectWrite(scp, &rockp->biod, &rockp->offset, rockp->length,
+                           flags, userp, reqp,
+                           rockp->memoryRegion, &bytesWritten);
+
+    switch ( code ) {
+    case CM_ERROR_TIMEDOUT:    /* or server restarting */
+    case CM_ERROR_RETRY:
+    case CM_ERROR_WOULDBLOCK:
+    case CM_ERROR_ALLBUSY:
+    case CM_ERROR_ALLDOWN:
+    case CM_ERROR_ALLOFFLINE:
+    case CM_ERROR_PARTIALWRITE:
+        /* do nothing; cm_BkgDaemon will retry the request */
+        break;
+    default:
+        lock_ObtainWrite(&scp->rw);
+        cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
+        lock_ReleaseWrite(&scp->rw);
+        free(rockp->memoryRegion);
+        rockp->memoryRegion = NULL;
+        break;
+    }
+    return code;
+}
+
+
+/*
+ * cm_SetupDirectStoreBIOD differs from cm_SetupStoreBIOD in that it
+ * doesn't worry about whether or not the cm_buf_t is dirty or not.  Nor
+ * does it concern itself with chunk size.  All of the cm_buf_t objects
+ * that overlap the requested range must be held.
+ *
+ * scp must be locked; temporarily unlocked during processing.
+ * If returns 0, returns buffers held in biop, and with
+ * CM_BUF_CMSTORING set.
+ *
+ * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
+ * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
+ * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
+ * must be woken, and the event must be set when the I/O is done.  All of this
+ * is required so that buf_WaitIO synchronizes properly with the buffer as it
+ * is being written out.
+ *
+ */
+afs_int32
+cm_SetupDirectStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, afs_uint32 inSize,
+                        cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
+{
+    cm_buf_t *bufp;
+    osi_queueData_t *qdp;
+    osi_hyper_t thyper;
+    osi_hyper_t tbase;
+    osi_hyper_t scanStart;     /* where to start scan for dirty pages */
+    osi_hyper_t scanEnd;       /* where to stop scan for dirty pages */
+    long code;
+    long flags;                        /* flags to cm_SyncOp */
+
+    /* clear things out */
+    biop->scp = scp;           /* do not hold; held by caller */
+    biop->userp = userp;        /* do not hold; held by caller */
+    biop->reqp = reqp;
+    biop->offset = *inOffsetp;
+    biop->length = 0;
+    biop->bufListp = NULL;
+    biop->bufListEndp = NULL;
+    biop->reserved = 0;
+
+    /*
+     * reserve enough buffers to cover the full range.
+     * drop the cm_scache.rw lock because buf_ReserveBuffers()
+     * can sleep if there is insufficient room.
+     */
+    lock_ReleaseWrite(&scp->rw);
+    biop->reserved = 1 + inSize / cm_data.buf_blockSize;
+    buf_ReserveBuffers(biop->reserved);
+
+    /*
+     * This pass is intended to ensure that a cm_buf_t object
+     * is allocated for each block of the direct store operation.
+     * No effort is going to be made to ensure that the blocks are
+     * populated with current data.  Blocks that are not current and
+     * are not fully overwritten by the direct store data will not
+     * be cached.
+     */
+
+    lock_ObtainWrite(&scp->bufCreateLock);
+
+    /*
+     * Compute the offset of the first buffer.
+     */
+    tbase = *inOffsetp;
+    tbase.LowPart -= tbase.LowPart % cm_data.buf_blockSize;
+
+    /*
+     * If the first buffer cannot be obtained, return an error
+     * immediately.  There is no clean up to be performed.
+     */
+    code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
+    if (code) {
+        lock_ReleaseRead(&scp->bufCreateLock);
+        buf_UnreserveBuffers(biop->reserved);
+        lock_ObtainWrite(&scp->rw);
+        return code;
+    }
+
+    /* get buffer mutex and scp mutex safely */
+    lock_ObtainMutex(&bufp->mx);
+
+    /*
+     * if the buffer is actively involved in I/O
+     * we wait for the I/O to complete.
+     */
+    if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
+        buf_WaitIO(scp, bufp);
+
+    lock_ObtainWrite(&scp->rw);
+    flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS |
+            CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
+    code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
+    if (code) {
+        lock_ReleaseMutex(&bufp->mx);
+        buf_Release(bufp);
+        buf_UnreserveBuffers(biop->reserved);
+        return code;
+    }
+    cm_SyncOpDone(scp, bufp, flags);
+    lock_ReleaseMutex(&bufp->mx);
+
+    /*
+     * Add the first buffer into the BIOD list.
+     */
+    qdp = osi_QDAlloc();
+    if (qdp == NULL) {
+        buf_Release(bufp);
+        buf_UnreserveBuffers(1 + inSize / cm_data.buf_blockSize);
+        return ENOMEM;
+    }
+    osi_SetQData(qdp, bufp);
+
+    if ( cm_verifyData )
+        buf_ComputeCheckSum(bufp);
+
+    /* don't have to hold bufp, since held by buf_Get above */
+    osi_QAddH((osi_queue_t **) &biop->bufListp,
+              (osi_queue_t **) &biop->bufListEndp,
+              &qdp->q);
+    biop->length = cm_data.buf_blockSize - (afs_uint32)(inOffsetp->QuadPart % cm_data.buf_blockSize);
+
+    if (biop->length < inSize) {
+        /* scan for the rest of the buffers */
+        thyper = ConvertLongToLargeInteger(biop->length);
+        scanStart = LargeIntegerAdd(bufp->offset, thyper);
+        thyper = ConvertLongToLargeInteger(inSize);
+        scanEnd = LargeIntegerAdd(*inOffsetp, thyper);
+
+        flags = CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
+        lock_ReleaseWrite(&scp->rw);
+
+        for ( tbase = scanStart, thyper = ConvertLongToLargeInteger(cm_data.buf_blockSize);
+              LargeIntegerLessThan(tbase, scanEnd);
+              tbase = LargeIntegerAdd(tbase, thyper))
+        {
+            code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
+            if (code) {
+                /* Must tear down biod */
+                goto error;
+            }
+
+            lock_ObtainMutex(&bufp->mx);
+            /*
+            * if the buffer is actively involved in I/O
+            * we wait for the I/O to complete.
+            */
+            if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
+                buf_WaitIO(scp, bufp);
+
+            lock_ObtainWrite(&scp->rw);
+            code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
+            lock_ReleaseWrite(&scp->rw);
+            lock_ReleaseMutex(&bufp->mx);
+            if (code) {
+                buf_Release(bufp);
+                goto error;
+            }
+
+            /*
+             * Add the buffer into the BIOD list.
+             */
+            qdp = osi_QDAlloc();
+            if (qdp == NULL) {
+                buf_Release(bufp);
+                code = ENOMEM;
+                goto error;
+            }
+            osi_SetQData(qdp, bufp);
+
+            if ( cm_verifyData )
+                buf_ComputeCheckSum(bufp);
+
+            /* don't have to hold bufp, since held by buf_Get above */
+            osi_QAddH( (osi_queue_t **) &biop->bufListp,
+                       (osi_queue_t **) &biop->bufListEndp,
+                       &qdp->q);
+            biop->length += cm_data.buf_blockSize;
+            bufp = NULL;       /* this buffer and reference added to the queue */
+        }
+
+        /* update biod info describing the transfer */
+        if (biop->length > inSize)
+            biop->length = inSize;
+
+        lock_ObtainWrite(&scp->rw);
+    }
+
+    /* finally, we're done */
+    lock_ReleaseWrite(&scp->bufCreateLock);
+    return 0;
+
+  error:
+    lock_ReleaseWrite(&scp->bufCreateLock);
+    /* tear down biod and clear buffer reservation */
+    cm_ReleaseBIOD(biop, TRUE, code, FALSE);
+    lock_ObtainWrite(&scp->rw);
+    return code;
+}
diff --git a/src/WINNT/afsd/cm_direct.h b/src/WINNT/afsd/cm_direct.h
new file mode 100644 (file)
index 0000000..e6cc5db
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2012 Your File System, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright
+ *   notice, this list of conditions and the following disclaimer in the
+ *   documentation and/or other materials provided with the distribution.
+ * - Neither the name of Your File System, Inc nor the names of its
+ *   contributors may be used to endorse or promote products derived
+ *   from this software without specific prior written permission from
+ *   Your File System, Inc.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef OPENAFS_WINNT_AFSD_CM_DIRECT_H
+#define OPENAFS_WINNT_AFSD_CM_DIRECT_H 1
+
+extern afs_int32
+cm_SetupDirectStoreBIOD( cm_scache_t *scp,
+                         osi_hyper_t *offsetp,
+                         afs_uint32   length,
+                         cm_bulkIO_t *biodp,
+                         cm_user_t   *userp,
+                         cm_req_t    *reqp);
+
+extern afs_int32
+cm_DirectWrite( IN cm_scache_t *scp,
+                IN osi_hyper_t *offsetp,
+                IN afs_uint32   length,
+                IN afs_uint32   flags,
+                IN cm_user_t   *userp,
+                IN cm_req_t    *reqp,
+                IN void        *memoryRegionp,
+                OUT afs_uint32 *bytesWritten);
+
+#define CM_DIRECT_SCP_LOCKED            0x1
+
+typedef struct rock_BkgDirectWrite {
+    osi_hyper_t offset;
+    afs_uint32  length;
+    afs_uint32  bypass_cache;
+    void *      memoryRegion;
+    cm_bulkIO_t biod;          /* bulk IO descriptor */
+} rock_BkgDirectWrite_t;
+
+extern afs_int32
+cm_BkgDirectWrite( cm_scache_t *scp, void *rockp, struct cm_user *userp, cm_req_t *reqp);
+
+extern void
+cm_BkgDirectWriteDone( cm_scache_t *scp, void *vrockp, afs_int32 code);
+#endif
index fcc26af..8ee0ed8 100644 (file)
@@ -171,6 +171,7 @@ RDR_SetInitParams( OUT AFSRedirectorInitInfo **ppRedirInitInfo, OUT DWORD * pRed
     *ppRedirInitInfo = (AFSRedirectorInitInfo *)malloc(*pRedirInitInfoLen);
     (*ppRedirInitInfo)->Flags = smb_hideDotFiles ? AFS_REDIR_INIT_FLAG_HIDE_DOT_FILES : 0;
     (*ppRedirInitInfo)->Flags |= cm_shortNames ? 0 : AFS_REDIR_INIT_FLAG_DISABLE_SHORTNAMES;
+    (*ppRedirInitInfo)->Flags |= cm_directIO ? AFS_REDIR_INIT_PERFORM_SERVICE_IO : 0;
     (*ppRedirInitInfo)->MaximumChunkLength = cm_data.chunkSize;
     (*ppRedirInitInfo)->GlobalFileId.Cell   = cm_data.rootFid.cell;
     (*ppRedirInitInfo)->GlobalFileId.Volume = cm_data.rootFid.volume;
@@ -6213,3 +6214,307 @@ RDR_PipeTransceive( IN cm_user_t     *userp,
     (*ResultCB)->ResultStatus = 0;
     osi_Log0(afsd_logp, "RDR_Pipe_Transceive SUCCESS");
 }
+
+void
+RDR_ReadFile( IN cm_user_t     *userp,
+              IN AFSFileID      FileID,
+              IN LARGE_INTEGER *Offset,
+              IN ULONG          BytesToRead,
+              IN PVOID          Buffer,
+              IN BOOL           bWow64,
+              IN BOOL           bCacheBypass,
+              IN DWORD          ResultBufferLength,
+              IN OUT AFSCommResult **ResultCB)
+{
+    AFSFileIOResultCB * pFileIOResultCB;
+    DWORD         status;
+    ULONG         Length;
+    ULONG         ulBytesRead = 0;
+    afs_uint32    code = 0;
+    cm_fid_t      fid;
+    cm_scache_t * scp = NULL;
+    cm_req_t      req;
+
+    RDR_InitReq(&req, bWow64);
+
+    osi_Log4(afsd_logp, "RDR_ReadFile FID cell=0x%x vol=0x%x vn=0x%x uniq=0x%x",
+             FileID.Cell, FileID.Volume, FileID.Vnode, FileID.Unique);
+
+    Length = sizeof(AFSFileIOResultCB);
+    if (Length > ResultBufferLength) {
+        *ResultCB = (AFSCommResult *)malloc(sizeof(AFSCommResult) );
+        if (!(*ResultCB))
+            return;
+        memset( *ResultCB, 0, sizeof(AFSCommResult));
+        (*ResultCB)->ResultStatus = STATUS_BUFFER_OVERFLOW;
+        return;
+    }
+    *ResultCB = (AFSCommResult *)malloc( Length + sizeof( AFSCommResult) );
+    if (!(*ResultCB))
+       return;
+    memset( *ResultCB, '\0', Length );
+    (*ResultCB)->ResultBufferLength = Length;
+    pFileIOResultCB = (AFSFileIOResultCB *)(*ResultCB)->ResultData;
+
+    if ( Buffer == NULL) {
+        (*ResultCB)->ResultStatus = STATUS_INVALID_PARAMETER;
+        osi_Log0(afsd_logp, "RDR_ReadFile Null IOctl Buffer");
+        return;
+    }
+
+    if (FileID.Cell != 0) {
+        fid.cell   = FileID.Cell;
+        fid.volume = FileID.Volume;
+        fid.vnode  = FileID.Vnode;
+        fid.unique = FileID.Unique;
+        fid.hash   = FileID.Hash;
+
+        code = cm_GetSCache(&fid, NULL, &scp, userp, &req);
+        if (code) {
+            smb_MapNTError(cm_MapRPCError(code, &req), &status, TRUE);
+            (*ResultCB)->ResultStatus = status;
+            osi_Log2(afsd_logp, "RDR_ReadFile cm_GetSCache failure code=0x%x status=0x%x",
+                      code, status);
+            return;
+        }
+    } else {
+        (*ResultCB)->ResultStatus = STATUS_OBJECT_NAME_INVALID;
+        osi_Log0(afsd_logp, "RDR_ReadFile Object Name Invalid - Cell = 0");
+        return;
+    }
+
+    /* Ensure that the caller can access this file */
+    lock_ObtainWrite(&scp->rw);
+    code = cm_SyncOp(scp, NULL, userp, &req, PRSFS_READ,
+                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
+    if (code) {
+        smb_MapNTError(cm_MapRPCError(code, &req), &status, TRUE);
+        (*ResultCB)->ResultStatus = status;
+        lock_ReleaseWrite(&scp->rw);
+        cm_ReleaseSCache(scp);
+        osi_Log2(afsd_logp, "RDR_ReadFile cm_SyncOp failure code=0x%x status=0x%x",
+                  code, status);
+        return;
+    }
+
+    cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
+
+    if (scp->fileType == CM_SCACHETYPE_DIRECTORY) {
+        (*ResultCB)->ResultStatus = STATUS_FILE_IS_A_DIRECTORY;
+        lock_ReleaseWrite(&scp->rw);
+        cm_ReleaseSCache(scp);
+        osi_Log1(afsd_logp, "RDR_ReadFile File is a Directory scp=0x%p",
+                 scp);
+        return;
+    }
+
+    if (scp->fileType != CM_SCACHETYPE_FILE) {
+        (*ResultCB)->ResultStatus = STATUS_REPARSE_POINT_NOT_RESOLVED;
+        lock_ReleaseWrite(&scp->rw);
+        cm_ReleaseSCache(scp);
+        osi_Log1(afsd_logp, "RDR_ReadFile File is a MountPoint or Link scp=0x%p",
+                 scp);
+        return;
+    }
+
+    if ( bCacheBypass) {
+        //
+        // Read the file directly into the buffer bypassing the AFS Cache
+        //
+        code = cm_GetData( scp, Offset, Buffer, BytesToRead, &ulBytesRead, userp, &req);
+    } else {
+        //
+        // Read the file via the AFS Cache
+        //
+        code = raw_ReadData( scp, Offset, BytesToRead, Buffer, &ulBytesRead, userp, &req);
+    }
+
+    if (code) {
+        smb_MapNTError(cm_MapRPCError(code, &req), &status, TRUE);
+        (*ResultCB)->ResultStatus = status;
+        osi_Log2(afsd_logp, "RDR_ReadFile failure code=0x%x status=0x%x",
+                 code, status);
+    } else {
+        (*ResultCB)->ResultStatus = STATUS_SUCCESS;
+        pFileIOResultCB->Length = ulBytesRead;
+        pFileIOResultCB->DataVersion.QuadPart = scp->dataVersion;
+        pFileIOResultCB->Expiration.QuadPart = scp->cbExpires;
+    }
+
+    lock_ReleaseWrite(&scp->rw);
+    cm_ReleaseSCache(scp);
+    return;
+}
+
+void
+RDR_WriteFile( IN cm_user_t     *userp,
+               IN AFSFileID      FileID,
+               IN AFSFileIOCB   *FileIOCB,
+               IN LARGE_INTEGER *Offset,
+               IN ULONG          BytesToWrite,
+               IN PVOID          Buffer,
+               IN BOOL           bWow64,
+               IN BOOL           bCacheBypass,
+               IN DWORD          ResultBufferLength,
+               IN OUT AFSCommResult **ResultCB)
+{
+    AFSFileIOResultCB * pFileIOResultCB;
+    DWORD         status;
+    ULONG         Length;
+    ULONG         ulBytesWritten = 0;
+    afs_uint32    code = 0;
+    cm_fid_t      fid;
+    cm_scache_t * scp = NULL;
+    cm_req_t      req;
+
+    RDR_InitReq(&req, bWow64);
+
+    osi_Log4(afsd_logp, "RDR_WriteFile FID cell=0x%x vol=0x%x vn=0x%x uniq=0x%x",
+             FileID.Cell, FileID.Volume, FileID.Vnode, FileID.Unique);
+
+    Length = sizeof(AFSFileIOResultCB);
+    if (Length > ResultBufferLength) {
+        *ResultCB = (AFSCommResult *)malloc(sizeof(AFSCommResult) );
+        if (!(*ResultCB))
+            return;
+        memset( *ResultCB, 0, sizeof(AFSCommResult));
+        (*ResultCB)->ResultStatus = STATUS_BUFFER_OVERFLOW;
+        return;
+    }
+    *ResultCB = (AFSCommResult *)malloc( Length + sizeof( AFSCommResult) );
+    if (!(*ResultCB))
+       return;
+    memset( *ResultCB, '\0', Length );
+    (*ResultCB)->ResultBufferLength = Length;
+    pFileIOResultCB = (AFSFileIOResultCB *)(*ResultCB)->ResultData;
+
+    if ( Buffer == NULL) {
+        (*ResultCB)->ResultStatus = STATUS_INVALID_PARAMETER;
+        osi_Log0(afsd_logp, "RDR_WriteFile Null IOctl Buffer");
+        return;
+    }
+
+    if (FileID.Cell != 0) {
+        fid.cell   = FileID.Cell;
+        fid.volume = FileID.Volume;
+        fid.vnode  = FileID.Vnode;
+        fid.unique = FileID.Unique;
+        fid.hash   = FileID.Hash;
+
+        code = cm_GetSCache(&fid, NULL, &scp, userp, &req);
+        if (code) {
+            smb_MapNTError(cm_MapRPCError(code, &req), &status, TRUE);
+            (*ResultCB)->ResultStatus = status;
+            osi_Log2(afsd_logp, "RDR_WriteFile cm_GetSCache failure code=0x%x status=0x%x",
+                      code, status);
+            return;
+        }
+    } else {
+        (*ResultCB)->ResultStatus = STATUS_OBJECT_NAME_INVALID;
+        osi_Log0(afsd_logp, "RDR_WriteFile Object Name Invalid - Cell = 0");
+        return;
+    }
+
+    /* Ensure that the caller can access this file */
+    lock_ObtainWrite(&scp->rw);
+    code = cm_SyncOp(scp, NULL, userp, &req, PRSFS_WRITE,
+                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
+    if (code == CM_ERROR_NOACCESS && scp->creator == userp) {
+        code = cm_SyncOp(scp, NULL, userp, &req, PRSFS_INSERT,
+                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
+    }
+    if (code) {
+        smb_MapNTError(cm_MapRPCError(code, &req), &status, TRUE);
+        (*ResultCB)->ResultStatus = status;
+        lock_ReleaseWrite(&scp->rw);
+        cm_ReleaseSCache(scp);
+        osi_Log2(afsd_logp, "RDR_WriteFile cm_SyncOp failure code=0x%x status=0x%x",
+                  code, status);
+        return;
+    }
+
+    cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
+
+    if (scp->fileType == CM_SCACHETYPE_DIRECTORY) {
+        (*ResultCB)->ResultStatus = STATUS_FILE_IS_A_DIRECTORY;
+        lock_ReleaseWrite(&scp->rw);
+        cm_ReleaseSCache(scp);
+        osi_Log1(afsd_logp, "RDR_WriteFile File is a Directory scp=0x%p",
+                 scp);
+        return;
+    }
+
+    if (scp->fileType != CM_SCACHETYPE_FILE) {
+        (*ResultCB)->ResultStatus = STATUS_REPARSE_POINT_NOT_RESOLVED;
+        lock_ReleaseWrite(&scp->rw);
+        cm_ReleaseSCache(scp);
+        osi_Log1(afsd_logp, "RDR_WriteFile File is a MountPoint or Link scp=0x%p",
+                 scp);
+        return;
+    }
+
+    if (FileIOCB->EndOfFile.QuadPart != scp->length.QuadPart)
+    {
+        cm_attr_t setAttr;
+
+        memset(&setAttr, 0, sizeof(cm_attr_t));
+        if (FileIOCB->EndOfFile.QuadPart != scp->length.QuadPart) {
+            osi_Log4(afsd_logp, "RDR_WriteFile new length fid vol 0x%x vno 0x%x length 0x%x:%x",
+                     scp->fid.volume, scp->fid.vnode,
+                     FileIOCB->EndOfFile.HighPart,
+                     FileIOCB->EndOfFile.LowPart);
+
+            setAttr.mask |= CM_ATTRMASK_LENGTH;
+            setAttr.length.LowPart = FileIOCB->EndOfFile.LowPart;
+            setAttr.length.HighPart = FileIOCB->EndOfFile.HighPart;
+            lock_ReleaseWrite(&scp->rw);
+            code = cm_SetAttr(scp, &setAttr, userp, &req);
+            osi_Log2(afsd_logp, "RDR_WriteFile cm_SetAttr failure scp=0x%p code 0x%x",
+                     scp, code);
+            code = 0;       /* ignore failure */
+            lock_ObtainWrite(&scp->rw);
+        }
+    }
+
+    /*
+     * The input buffer may contain data beyond the end of the file.
+     * Such data must be discarded.
+     */
+    if ( Offset->QuadPart + BytesToWrite > scp->length.QuadPart)
+    {
+        if ( Offset->QuadPart > scp->length.QuadPart) {
+            (*ResultCB)->ResultStatus = STATUS_SUCCESS;
+            lock_ReleaseWrite(&scp->rw);
+            cm_ReleaseSCache(scp);
+            osi_Log1(afsd_logp, "RDR_WriteFile Nothing to do scp=0x%p",
+                     scp);
+            return;
+        }
+
+        BytesToWrite -= (afs_uint32)(Offset->QuadPart + BytesToWrite - scp->length.QuadPart);
+    }
+
+    if (bCacheBypass) {
+        code = cm_DirectWrite( scp, Offset, BytesToWrite,
+                               CM_DIRECT_SCP_LOCKED,
+                               userp, &req, Buffer, &ulBytesWritten);
+    } else {
+        code = raw_WriteData( scp, Offset, BytesToWrite, Buffer, userp, &req, &ulBytesWritten);
+    }
+
+    if (code) {
+        smb_MapNTError(cm_MapRPCError(code, &req), &status, TRUE);
+        (*ResultCB)->ResultStatus = status;
+        osi_Log2(afsd_logp, "RDR_WriteFile failure code=0x%x status=0x%x",
+                 code, status);
+    } else {
+        (*ResultCB)->ResultStatus = STATUS_SUCCESS;
+        pFileIOResultCB->Length = ulBytesWritten;
+        pFileIOResultCB->DataVersion.QuadPart = scp->dataVersion;
+        pFileIOResultCB->Expiration.QuadPart = scp->cbExpires;
+    }
+
+    lock_ReleaseWrite(&scp->rw);
+    cm_ReleaseSCache(scp);
+    return;
+}
index d38508f..921bbbd 100644 (file)
@@ -64,6 +64,7 @@ typedef LONG NTSTATUS, *PNTSTATUS;      // not declared in ntstatus.h
 extern "C" {
 #include <osilog.h>
 extern osi_log_t *afsd_logp;
+extern int cm_directIO;
 
 #include <WINNT/afsreg.h>
 #include <afs/cm_config.h>
@@ -367,12 +368,19 @@ RDR_ProcessWorkerThreads(DWORD numThreads)
 
     for (index = 0; index < numThreads; index++)
     {
-        //
-        // 20% of worker threads should be reserved for release extent
-        // event processing
-        //
-        glWorkerThreadInfo[ glThreadHandleIndex].Flags =
-            (glThreadHandleIndex % 5) ? 0 : AFS_REQUEST_RELEASE_THREAD;
+        if ( !cm_directIO)
+        {
+            //
+            // 20% of worker threads should be reserved for release extent
+            // event processing
+            //
+            glWorkerThreadInfo[ glThreadHandleIndex].Flags =
+                (glThreadHandleIndex % 5) ? 0 : AFS_REQUEST_RELEASE_THREAD;
+        }
+        else
+        {
+            glWorkerThreadInfo[ glThreadHandleIndex].Flags = 0;
+        }
         glWorkerThreadInfo[ glThreadHandleIndex].hEvent = hEvent;
         glWorkerThreadInfo[ glThreadHandleIndex].hThread =
             CreateThread( NULL,
@@ -544,6 +552,7 @@ RDR_ProcessRequest( AFSCommRequest *RequestBuffer)
     BOOL                bDeleteFile = (RequestBuffer->RequestFlags & AFS_REQUEST_FLAG_FILE_DELETED) ? TRUE : FALSE;
     BOOL                bUnlockFile = (RequestBuffer->RequestFlags & AFS_REQUEST_FLAG_BYTE_RANGE_UNLOCK_ALL) ? TRUE : FALSE;
     BOOL                bCheckOnly = (RequestBuffer->RequestFlags & AFS_REQUEST_FLAG_CHECK_ONLY) ? TRUE : FALSE;
+    BOOL                bCacheBypass = (RequestBuffer->RequestFlags & AFS_REQUEST_FLAG_CACHE_BYPASS) ? TRUE : FALSE;
     BOOL                bRetry = FALSE;
     BOOL                bUnsupported = FALSE;
     BOOL                bIsLocalSystem = (RequestBuffer->RequestFlags & AFS_REQUEST_LOCAL_SYSTEM_PAG) ? TRUE : FALSE;
@@ -1309,6 +1318,58 @@ RDR_ProcessRequest( AFSCommRequest *RequestBuffer)
             break;
         }
 
+
+        case AFS_REQUEST_TYPE_PROCESS_READ_FILE: {
+            AFSFileIOCB *pFileIOCB = (AFSFileIOCB *)((char *)RequestBuffer->Name + RequestBuffer->DataOffset);
+
+            if (afsd_logp->enabled) {
+                swprintf( wchBuffer, L"ProcessRequest Processing AFS_REQUEST_TYPE_READ_FILE Index %08lX FID %08lX.%08lX.%08lX.%08lX",
+                          RequestBuffer->RequestIndex,
+                          RequestBuffer->FileId.Cell, RequestBuffer->FileId.Volume,
+                          RequestBuffer->FileId.Vnode, RequestBuffer->FileId.Unique);
+
+                osi_Log1(afsd_logp, "%S", osi_LogSaveStringW(afsd_logp, wchBuffer));
+            }
+
+            RDR_ReadFile( userp,
+                          RequestBuffer->FileId,
+                          &pFileIOCB->IOOffset,
+                          pFileIOCB->IOLength,
+                          pFileIOCB->MappedIOBuffer,
+                          bWow64,
+                          bCacheBypass,
+                          RequestBuffer->ResultBufferLength,
+                          &pResultCB);
+
+            break;
+        }
+
+        case AFS_REQUEST_TYPE_PROCESS_WRITE_FILE: {
+            AFSFileIOCB *pFileIOCB = (AFSFileIOCB *)((char *)RequestBuffer->Name + RequestBuffer->DataOffset);
+
+            if (afsd_logp->enabled) {
+                swprintf( wchBuffer, L"ProcessRequest Processing AFS_REQUEST_TYPE_WRITE_FILE Index %08lX FID %08lX.%08lX.%08lX.%08lX",
+                          RequestBuffer->RequestIndex,
+                          RequestBuffer->FileId.Cell, RequestBuffer->FileId.Volume,
+                          RequestBuffer->FileId.Vnode, RequestBuffer->FileId.Unique);
+
+                osi_Log1(afsd_logp, "%S", osi_LogSaveStringW(afsd_logp, wchBuffer));
+            }
+
+            RDR_WriteFile( userp,
+                           RequestBuffer->FileId,
+                           pFileIOCB,
+                           &pFileIOCB->IOOffset,
+                           pFileIOCB->IOLength,
+                           pFileIOCB->MappedIOBuffer,
+                           bWow64,
+                           bCacheBypass,
+                           RequestBuffer->ResultBufferLength,
+                           &pResultCB);
+
+            break;
+        }
+
         default:
             bUnsupported = TRUE;
 
index e70213c..a29d9b5 100644 (file)
@@ -366,6 +366,29 @@ RDR_PipeTransceive( IN cm_user_t     *userp,
                     IN DWORD          ResultBufferLength,
                     IN OUT AFSCommResult **ResultCB);
 
+void
+RDR_ReadFile( IN cm_user_t     *userp,
+              IN AFSFileID      FileID,
+              IN LARGE_INTEGER *Offset,
+              IN ULONG          Length,
+              IN PVOID          Buffer,
+              IN BOOL           bWow64,
+              IN BOOL           bCacheBypass,
+              IN DWORD          ResultBufferLength,
+              IN OUT AFSCommResult **ResultCB);
+
+void
+RDR_WriteFile( IN cm_user_t     *userp,
+               IN AFSFileID      FileID,
+               IN AFSFileIOCB   *FileIOCB,
+               IN LARGE_INTEGER *Offset,
+               IN ULONG          Length,
+               IN PVOID          Buffer,
+               IN BOOL           bWow64,
+               IN BOOL           bCacheBypass,
+               IN DWORD          ResultBufferLength,
+               IN OUT AFSCommResult **ResultCB);
+
 cm_user_t *
 RDR_UserFromCommRequest( IN AFSCommRequest * pRequest);