rx-retain-windowing-per-peer-20080508
authorDerrick Brashear <shadow@dementia.org>
Thu, 8 May 2008 22:24:52 +0000 (22:24 +0000)
committerDerrick Brashear <shadow@dementia.org>
Thu, 8 May 2008 22:24:52 +0000 (22:24 +0000)
LICENSE IPL10

we learned about the peer in a previous connection... retain the information
and keep using it. widen the available window.

makes rx perform better over high latency wans. needs to be present in both
sides for maximal effect.

src/rx/rx.c
src/rx/rx.h
src/rx/rx_globals.h
src/rx/rx_rdwr.c
src/rx/test/generator.c

index 401ff77..e1024ad 100644 (file)
@@ -737,7 +737,7 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
                 register struct rx_securityClass *securityObject,
                 int serviceSecurityIndex)
 {
-    int hashindex;
+    int hashindex, i;
     afs_int32 cid;
     register struct rx_connection *conn;
 
@@ -773,6 +773,10 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
     conn->delayedAbortEvent = NULL;
     conn->abortCount = 0;
     conn->error = 0;
+    for (i = 0; i < RX_MAXCALLS; i++) {
+       conn->twind[i] = rx_initSendWindow;
+       conn->rwind[i] = rx_initReceiveWindow;
+    }
 
     RXS_NewConnection(securityObject, conn);
     hashindex =
@@ -2149,6 +2153,8 @@ rxi_NewCall(register struct rx_connection *conn, register int channel)
     }
     call->channel = channel;
     call->callNumber = &conn->callNumber[channel];
+    call->rwind = conn->rwind[channel];
+    call->twind = conn->twind[channel];
     /* Note that the next expected call number is retained (in
      * conn->callNumber[i]), even if we reallocate the call structure
      */
@@ -2309,7 +2315,7 @@ rxi_FindConnection(osi_socket socket, register afs_int32 host,
                   register u_short port, u_short serviceId, afs_uint32 cid,
                   afs_uint32 epoch, int type, u_int securityIndex)
 {
-    int hashindex, flag;
+    int hashindex, flag, i;
     register struct rx_connection *conn;
     hashindex = CONN_HASH(host, port, cid, epoch, type);
     MUTEX_ENTER(&rx_connHashTable_lock);
@@ -2379,6 +2385,10 @@ rxi_FindConnection(osi_socket socket, register afs_int32 host,
        conn->specific = NULL;
        rx_SetConnDeadTime(conn, service->connDeadTime);
        rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
+       for (i = 0; i < RX_MAXCALLS; i++) {
+           conn->twind[i] = rx_initSendWindow;
+           conn->rwind[i] = rx_initReceiveWindow;
+       }
        /* Notify security object of the new connection */
        RXS_NewConnection(conn->securityObject, conn);
        /* XXXX Connection timeout? */
@@ -3755,6 +3765,7 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
            if (tSize < call->twind) {  /* smaller than our send */
                call->twind = tSize;    /* window, we must send less... */
                call->ssthresh = MIN(call->twind, call->ssthresh);
+               call->conn->twind[call->channel] = call->twind;
            }
 
            /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
@@ -3778,9 +3789,11 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
             */
            if (tSize < call->twind) {
                call->twind = tSize;
+               call->conn->twind[call->channel] = call->twind;
                call->ssthresh = MIN(call->twind, call->ssthresh);
            } else if (tSize > call->twind) {
                call->twind = tSize;
+               call->conn->twind[call->channel] = call->twind;
            }
 
            /*
@@ -4495,8 +4508,8 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     }
     queue_Init(&call->rq);
     call->error = 0;
-    call->rwind = rx_initReceiveWindow;
-    call->twind = rx_initSendWindow;
+    call->twind = call->conn->twind[call->channel];
+    call->rwind = call->conn->rwind[call->channel];
     call->nSoftAcked = 0;
     call->nextCwind = 0;
     call->nAcks = 0;
@@ -4607,7 +4620,7 @@ rxi_SendAck(register struct rx_call *call,
      * Open the receive window once a thread starts reading packets
      */
     if (call->rnext > 1) {
-       call->rwind = rx_maxReceiveWindow;
+       call->conn->rwind[call->channel] = call->rwind = rx_maxReceiveWindow;
     }
 
     call->nHardAcks = 0;
index 22b61df..d40669b 100644 (file)
@@ -235,6 +235,8 @@ struct rx_connection {
     struct rx_call *call[RX_MAXCALLS];
 #endif
     afs_uint32 callNumber[RX_MAXCALLS];        /* Current call numbers */
+    afs_uint32 rwind[RX_MAXCALLS];
+    u_short twind[RX_MAXCALLS];
     afs_uint32 serial;         /* Next outgoing packet serial number */
     afs_uint32 lastSerial;     /* # of last packet received, for computing skew */
     afs_int32 maxSerial;       /* largest serial number seen on incoming packets */
index 5632012..749a380 100644 (file)
@@ -119,9 +119,9 @@ EXT int rx_BusyError GLOBALSINIT(-1);
 
 EXT int rx_minWindow GLOBALSINIT(1);
 EXT int rx_initReceiveWindow GLOBALSINIT(16);  /* how much to accept */
-EXT int rx_maxReceiveWindow GLOBALSINIT(32);   /* how much to accept */
-EXT int rx_initSendWindow GLOBALSINIT(8);
-EXT int rx_maxSendWindow GLOBALSINIT(32);
+EXT int rx_maxReceiveWindow GLOBALSINIT(64);   /* how much to accept */
+EXT int rx_initSendWindow GLOBALSINIT(16);
+EXT int rx_maxSendWindow GLOBALSINIT(64);
 EXT int rx_nackThreshold GLOBALSINIT(3);       /* Number NACKS to trigger congestion recovery */
 EXT int rx_nDgramThreshold GLOBALSINIT(4);     /* Number of packets before increasing
                                         * packets per datagram */
index 0c019c2..c2df01c 100644 (file)
@@ -700,7 +700,7 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
            }
            /* Wait for transmit window to open up */
            while (!call->error
-                  && call->tnext + 1 > call->tfirst + call->twind) {
+                  && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
                clock_NewTime();
                call->startWait = clock_Sec();
 
@@ -1138,7 +1138,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
     }
 
     /* Wait for the length of the transmit queue to fall below call->twind */
-    while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
+    while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
        clock_NewTime();
        call->startWait = clock_Sec();
 #ifdef RX_ENABLE_LOCKS
index 214e2d7..a7d4536 100644 (file)
@@ -60,7 +60,6 @@ RCSID
 #include <stdlib.h>
 #include <limits.h>
 #include <float.h>
-#include <malloc.h>
 #include <assert.h>
 #include "generator.h"