Windows: Use rx_Readv / rx_Writev
authorJeffrey Altman <jaltman@your-file-system.com>
Sun, 17 Oct 2010 04:35:36 +0000 (00:35 -0400)
committerJeffrey Altman <jaltman@openafs.org>
Fri, 22 Oct 2010 05:31:48 +0000 (22:31 -0700)
When USE_RX_IOVEC is defined, cm_BufWrite() will utilize rx_Writev()
instead of rx_Write() and cm_GetBuffer() will use rx_Readv() instead
of rx_Read() to improve throughput.

LICENSE MIT

Change-Id: Ib70dfd4fd7a79c9ce36ef4fd8f4bb46a946621fd
Reviewed-on: http://gerrit.openafs.org/2999
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Tested-by: Jeffrey Altman <jaltman@openafs.org>
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>

src/WINNT/afsd/cm_dcache.c

index 72081fb..75fb05c 100644 (file)
@@ -13,9 +13,6 @@
 #include <windows.h>
 #include <winsock2.h>
 #include <nb30.h>
-#ifdef COMMENT
-#include <malloc.h>
-#endif
 #include <string.h>
 #include <stdlib.h>
 #include <osi.h>
@@ -30,6 +27,8 @@ extern void afsi_log(char *pattern, ...);
 extern osi_mutex_t cm_Freelance_Lock;
 #endif
 
+#define USE_RX_IOVEC 1
+
 /* we can access connp->serverp without holding a lock because that
    never changes since the connection is made. */
 #define SERVERHAS64BIT(connp) (!((connp)->serverp->flags & CM_SERVERFLAG_NO64BIT))
@@ -195,10 +194,73 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
         }
 
         if (code == 0) {
+            afs_uint32 buf_offset;
+
             /* write the data from the the list of buffers */
             qdp = NULL;
             while(nbytes > 0) {
-                afs_uint32 buf_offset;
+#ifdef USE_RX_IOVEC
+                struct iovec tiov[RX_MAXIOVECS];
+                afs_int32 tnio, vlen, vbytes, iov, voffset;
+                afs_uint32 vleft;
+
+                vbytes = rx_WritevAlloc(rxcallp, tiov, &tnio, RX_MAXIOVECS, nbytes);
+                if (vbytes <= 0) {
+                    code = RX_PROTOCOL_ERROR;
+                    break;
+                }
+
+                for ( iov = voffset = vlen = 0; vlen < vbytes; vlen += wbytes) {
+                    if (qdp == NULL) {
+                        qdp = biod.bufListEndp;
+                        buf_offset = offsetp->LowPart % cm_data.buf_blockSize;
+                    } else if (buf_offset == cm_data.buf_blockSize) {
+                        qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
+                        buf_offset = 0;
+                    }
+
+                    osi_assertx(qdp != NULL, "null osi_queueData_t");
+                    bufp = osi_GetQData(qdp);
+                    bufferp = bufp->datap + buf_offset;
+                    wbytes = vbytes - vlen;
+                    if (wbytes > cm_data.buf_blockSize - buf_offset)
+                        wbytes = cm_data.buf_blockSize - buf_offset;
+
+                    vleft = tiov[iov].iov_len - voffset;
+                    while (wbytes > vleft) {
+                        memcpy(tiov[iov].iov_base + voffset, bufferp, vleft);
+                        vlen += vleft;
+                        wbytes -= vleft;
+                        bufferp += vleft;
+
+                        iov++;
+                        voffset = 0;
+                        vleft = tiov[iov].iov_len;
+                    }
+
+                    memcpy(tiov[iov].iov_base + voffset, bufferp, wbytes);
+                    if (tiov[iov].iov_len == voffset + wbytes) {
+                        iov++;
+                        voffset = 0;
+                        vleft = tiov[iov].iov_len;
+                    } else {
+                        voffset += wbytes;
+                        vleft -= wbytes;
+                    }
+                    buf_offset += wbytes;
+                }
+
+                temp = rx_Writev(rxcallp, tiov, tnio, vbytes);
+                if (temp != vbytes) {
+                    osi_Log3(afsd_logp, "rx_Writev failed bp 0x%p, %d != %d", bufp, temp, vbytes);
+                    code = (rxcallp->error < 0) ? rxcallp->error : RX_PROTOCOL_ERROR;
+                    break;
+                }
+
+                osi_Log3(afsd_logp, "rx_Writev succeeded bp 0x%p offset 0x%x, wrote %u",
+                         bufp, buf_offset, vbytes);
+                nbytes -= vbytes;
+#else /* USE_RX_IOVEC */
                 if (qdp == NULL) {
                     qdp = biod.bufListEndp;
                     buf_offset = offsetp->LowPart % cm_data.buf_blockSize;
@@ -206,6 +268,7 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
                     buf_offset = 0;
                 }
+
                 osi_assertx(qdp != NULL, "null osi_queueData_t");
                 bufp = osi_GetQData(qdp);
                 bufferp = bufp->datap + buf_offset;
@@ -223,6 +286,7 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
                     osi_Log2(afsd_logp, "rx_Write succeeded bp 0x%p, %d",bufp,temp);
                 }       
                 nbytes -= wbytes;
+#endif /* USE_RX_IOVEC */
             }  /* while more bytes to write */
         }      /* if RPC started successfully */
 
@@ -1511,6 +1575,7 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
     AFSCallBack callback;
     AFSVolSync volSync;
     char *bufferp;
+    afs_uint32 buffer_offset;
     cm_buf_t *tbufp;                   /* buf we're filling */
     osi_queueData_t *qdp;              /* q element we're scanning */
     AFSFid tfid;
@@ -1786,9 +1851,11 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
             if (qdp) {
                 tbufp = osi_GetQData(qdp);
                 bufferp = tbufp->datap;
+                buffer_offset = 0;
             }
             else 
                 bufferp = NULL;
+
             /* fill length_found of data from the pipe into the pages.
              * When we stop, qdp will point at the last page we're
              * dealing with, and bufferp will tell us where we
@@ -1797,6 +1864,80 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
              * clear later pages out, if we fetch past EOF).
              */
             while (length_found > 0) {
+#ifdef USE_RX_IOVEC
+                struct iovec tiov[RX_MAXIOVECS];
+                afs_int32 tnio, iov, iov_offset;
+
+                temp = rx_Readv(rxcallp, tiov, &tnio, RX_MAXIOVECS, length_found);
+                osi_Log1(afsd_logp, "cm_GetBuffer rx_Readv returns %d", temp);
+                if (temp != length_found && temp < cm_data.buf_blockSize) {
+                    /*
+                     * If the file server returned (filesize - offset),
+                     * then the first rx_Read will return zero octets of data.
+                     * If it does, do not treat it as an error.  Correct the
+                     * length_found and continue as if the file server said
+                     * it was sending us zero octets of data.
+                     */
+                    if (fs_fetchdata_offset_bug && first_read)
+                        length_found = 0;
+                    else
+                        code = (rxcallp->error < 0) ? rxcallp->error : RX_PROTOCOL_ERROR;
+                    break;
+                }
+
+                iov = 0;
+                iov_offset = 0;
+                rbytes = temp;
+
+                while (rbytes > 0) {
+                    afs_int32 len;
+
+                    osi_assertx(bufferp != NULL, "null cm_buf_t");
+
+                    len = MIN(tiov[iov].iov_len - iov_offset, cm_data.buf_blockSize - buffer_offset);
+                    memcpy(bufferp + buffer_offset, tiov[iov].iov_base + iov_offset, len);
+                    iov_offset += len;
+                    buffer_offset += len;
+                    rbytes -= len;
+
+                    if (iov_offset == tiov[iov].iov_len) {
+                        iov++;
+                        iov_offset = 0;
+                    }
+
+                    if (buffer_offset == cm_data.buf_blockSize) {
+                        /* allow read-while-fetching.
+                        * if this is the last buffer, clear the
+                        * PREFETCHING flag, so the reader waiting for
+                        * this buffer will start a prefetch.
+                        */
+                        tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED;
+                        lock_ObtainWrite(&scp->rw);
+                        if (scp->flags & CM_SCACHEFLAG_WAITING) {
+                            osi_Log1(afsd_logp, "CM GetBuffer Waking scp 0x%p", scp);
+                            osi_Wakeup((LONG_PTR) &scp->flags);
+                        }
+                        if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) {
+                            osi_hyper_t tlength = ConvertLongToLargeInteger(biod.length);
+                            *cpffp = 1;
+                            cm_ClearPrefetchFlag(0, scp, &biod.offset, &tlength);
+                        }
+                        lock_ReleaseWrite(&scp->rw);
+
+                        /* Advance the buffer */
+                        qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
+                        if (qdp) {
+                            tbufp = osi_GetQData(qdp);
+                            bufferp = tbufp->datap;
+                            buffer_offset = 0;
+                        }
+                        else
+                            bufferp = NULL;
+                    }
+                }
+
+                length_found -= temp;
+#else /* USE_RX_IOVEC */
                 /* assert that there are still more buffers;
                  * our check above for length_found being less than
                  * biod.length should ensure this.
@@ -1854,6 +1995,7 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
                         bufferp = NULL;
                 } else 
                     bufferp += temp;
+#endif /* USE_RX_IOVEC */
             }
 
             /* zero out remainder of last pages, in case we are
@@ -1862,12 +2004,17 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp
              * a page.  Zero the remainder of that page, and then
              * all of the rest of the pages.
              */
+#ifdef USE_RX_IOVEC
+            rbytes = cm_data.buf_blockSize - buffer_offset;
+            bufferp = tbufp->datap + buffer_offset;
+#else /* USE_RX_IOVEC */
             /* bytes fetched */
            osi_assertx((bufferp - tbufp->datap) < LONG_MAX, "data >= LONG_MAX");
             rbytes = (long) (bufferp - tbufp->datap);
 
             /* bytes left to zero */
             rbytes = cm_data.buf_blockSize - rbytes;
+#endif /* USE_RX_IOVEC */
             while(qdp) {
                 if (rbytes != 0)
                     memset(bufferp, 0, rbytes);