RX: Tidy reader data locking
[openafs.git] / src / rx / rx_rdwr.c
index 82dc947..27acf1b 100644 (file)
@@ -1,7 +1,7 @@
  /*
   * Copyright 2000, International Business Machines Corporation and others.
   * All Rights Reserved.
-  * 
+  *
   * This software has been released under the terms of the IBM Public
   * License.  For details, see the LICENSE file in the top-level source
   * directory or online at http://www.openafs.org/dl/license10.html
@@ -14,8 +14,6 @@
 #include <afs/param.h>
 #endif
 
-RCSID
-    ("$Header$");
 
 #ifdef KERNEL
 #ifndef UKERNEL
@@ -28,6 +26,9 @@ RCSID
 #include "h/types.h"
 #include "h/time.h"
 #include "h/stat.h"
+#if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
+#include "h/systm.h"
+#endif
 #ifdef AFS_OSF_ENV
 #include <net/net_globals.h>
 #endif /* AFS_OSF_ENV */
@@ -64,25 +65,20 @@ RCSID
 #undef kmem_free
 #undef mem_alloc
 #undef mem_free
-#undef register
 #endif /* AFS_OSF_ENV */
 #else /* KERNEL */
 # include <sys/types.h>
-#ifndef AFS_NT40_ENV
+#ifdef AFS_NT40_ENV
+# include <winsock2.h>
+#else /* !AFS_NT40_ENV */
 # include <sys/socket.h>
 # include <sys/file.h>
 # include <netdb.h>
 # include <netinet/in.h>
 # include <sys/stat.h>
 # include <sys/time.h>
-#endif
-#ifdef HAVE_STRING_H
+#endif /* !AFS_NT40_ENV */
 #include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
@@ -102,23 +98,23 @@ static int rxdb_fileID = RXDB_FILE_RX_RDWR;
  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
  */
 int
-rxi_ReadProc(register struct rx_call *call, register char *buf,
-            register int nbytes)
+rxi_ReadProc(struct rx_call *call, char *buf,
+            int nbytes)
 {
-    register struct rx_packet *cp = call->currentPacket;
-    register struct rx_packet *rp;
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
-    register int requestCount;
-    register unsigned int t;
+    struct rx_packet *cp = call->currentPacket;
+    struct rx_packet *rp;
+    int requestCount;
+    unsigned int t;
+
 /* XXXX took out clock_NewTime from here.  Was it needed? */
     requestCount = nbytes;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    if (!queue_IsEmpty(&call->iovq)) {
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+    if (queue_IsNotEmpty(&call->iovq)) {
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     do {
@@ -139,8 +135,14 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                    rp = queue_First(&call->rq, rx_packet);
                    if (rp->header.seq == call->rnext) {
                        afs_int32 error;
-                       register struct rx_connection *conn = call->conn;
+                       struct rx_connection *conn = call->conn;
                        queue_Remove(rp);
+#ifdef RX_TRACK_PACKETS
+                       rp->flags &= ~RX_PKTFLAG_RQ;
+#endif
+#ifdef RXDEBUG_PACKET
+                        call->rqc--;
+#endif /* RXDEBUG_PACKET */
 
                        /* RXS_CheckPacket called to undo RXS_PreparePacket's
                         * work.  It may reduce the length of the packet by up
@@ -149,8 +151,8 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                        if ((error =
                             RXS_CheckPacket(conn->securityObject, call,
                                             rp))) {
-                           /* Used to merely shut down the call, but now we 
-                            * shut down the whole connection since this may 
+                           /* Used to merely shut down the call, but now we
+                            * shut down the whole connection since this may
                             * indicate an attempt to hijack it */
 
                            MUTEX_EXIT(&call->lock);
@@ -165,8 +167,11 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                        }
                        call->rnext++;
                        cp = call->currentPacket = rp;
+#ifdef RX_TRACK_PACKETS
+                       call->currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
                        call->curvec = 1;       /* 0th vec is always header */
-                       /* begin at the beginning [ more or less ], continue 
+                       /* begin at the beginning [ more or less ], continue
                         * on until the end, then stop. */
                        call->curpos =
                            (char *)cp->wirevec[1].iov_base +
@@ -196,8 +201,9 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                                               RX_CALL_REFCOUNT_DELAY);
                                rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
                            } else {
-                               struct clock when;
-                               clock_GetTime(&when);
+                               struct clock when, now;
+                               clock_GetTime(&now);
+                               when = now;
                                /* Delay to consolidate ack packets */
                                clock_Add(&when, &rx_hardAckDelay);
                                if (!call->delayedAckEvent
@@ -208,7 +214,7 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                                                   RX_CALL_REFCOUNT_DELAY);
                                    CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
                                    call->delayedAckEvent =
-                                       rxevent_Post(&when,
+                                     rxevent_PostNow(&when, &now,
                                                     rxi_SendDelayedAck, call,
                                                     0);
                                }
@@ -218,9 +224,13 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                    }
                }
 
-/*
-MTUXXX  doesn't there need to be an "else" here ??? 
-*/
+                /*
+                 * If we reach this point either we have no packets in the
+                 * receive queue or the next packet in the queue is not the
+                 * one we are looking for.  There is nothing else for us to
+                 * do but wait for another packet to arrive.
+                 */
+
                /* Are there ever going to be any more packets? */
                if (call->flags & RX_CALL_RECEIVE_DONE) {
                    return requestCount - nbytes;
@@ -236,6 +246,7 @@ MTUXXX  doesn't there need to be an "else" here ???
                    osi_rxSleep(&call->rq);
 #endif
                }
+                cp = call->currentPacket;
 
                call->startWait = 0;
 #ifdef RX_ENABLE_LOCKS
@@ -266,6 +277,9 @@ MTUXXX  doesn't there need to be an "else" here ???
 
                if (!call->nLeft) {
                    /* out of packet.  Get another one. */
+#ifdef RX_TRACK_PACKETS
+                   call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
                    rxi_FreePacket(cp);
                    cp = call->currentPacket = (struct rx_packet *)0;
                } else if (!call->curlen) {
@@ -273,6 +287,9 @@ MTUXXX  doesn't there need to be an "else" here ???
                    if (++call->curvec >= cp->niovecs) {
                        /* current packet is exhausted, get ready for another */
                        /* don't worry about curvec and stuff, they get set somewhere else */
+#ifdef RX_TRACK_PACKETS
+                       call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
                        rxi_FreePacket(cp);
                        cp = call->currentPacket = (struct rx_packet *)0;
                        call->nLeft = 0;
@@ -302,37 +319,33 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
     char *tcurpos;
     SPLVAR;
 
-    /*
-     * Free any packets from the last call to ReadvProc/WritevProc.
-     * We do not need the lock because the receiver threads only
-     * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
-     * RX_CALL_IOVEC_WAIT is always cleared before returning from
-     * ReadvProc/WritevProc.
-     */
+    /* Free any packets from the last call to ReadvProc/WritevProc */
     if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     /*
      * Most common case, all of the data is in the current iovec.
-     * We do not need the lock because this is the only thread that
-     * updates the curlen, curpos, nLeft fields.
-     *
      * We are relying on nLeft being zero unless the call is in receive mode.
      */
     tcurlen = call->curlen;
     tnLeft = call->nLeft;
     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
        tcurpos = call->curpos;
-       memcpy(buf, tcurpos, nbytes);
+        memcpy(buf, tcurpos, nbytes);
+
        call->curpos = tcurpos + nbytes;
        call->curlen = tcurlen - nbytes;
        call->nLeft = tnLeft - nbytes;
+
+        if (!call->nLeft && call->currentPacket != NULL) {
+            /* out of packet.  Get another one. */
+            rxi_FreePacket(call->currentPacket);
+            call->currentPacket = (struct rx_packet *)0;
+        }
        return nbytes;
     }
 
@@ -354,42 +367,34 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
     char *tcurpos;
     SPLVAR;
 
-    /*
-     * Free any packets from the last call to ReadvProc/WritevProc.
-     * We do not need the lock because the receiver threads only
-     * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
-     * RX_CALL_IOVEC_WAIT is always cleared before returning from
-     * ReadvProc/WritevProc.
-     */
+    /* Free any packets from the last call to ReadvProc/WritevProc */
     if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     /*
      * Most common case, all of the data is in the current iovec.
-     * We do not need the lock because this is the only thread that
-     * updates the curlen, curpos, nLeft fields.
-     *
      * We are relying on nLeft being zero unless the call is in receive mode.
      */
     tcurlen = call->curlen;
     tnLeft = call->nLeft;
-    if (!call->error && tcurlen > sizeof(afs_int32)
-       && tnLeft > sizeof(afs_int32)) {
+    if (!call->error && tcurlen >= sizeof(afs_int32)
+       && tnLeft >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
-       if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
-           *value = *((afs_int32 *) (tcurpos));
-       } else {
-           memcpy((char *)value, tcurpos, sizeof(afs_int32));
-       }
-       call->curpos = tcurpos + sizeof(afs_int32);
-       call->curlen = tcurlen - sizeof(afs_int32);
-       call->nLeft = tnLeft - sizeof(afs_int32);
+
+        memcpy((char *)value, tcurpos, sizeof(afs_int32));
+
+        call->curpos = tcurpos + sizeof(afs_int32);
+       call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
+       call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
+        if (!call->nLeft && call->currentPacket != NULL) {
+            /* out of packet.  Get another one. */
+            rxi_FreePacket(call->currentPacket);
+            call->currentPacket = (struct rx_packet *)0;
+        }
        return sizeof(afs_int32);
     }
 
@@ -398,6 +403,7 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
     MUTEX_EXIT(&call->lock);
     USERPRI;
+
     return bytes;
 }
 
@@ -412,7 +418,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
 {
     int didConsume = 0;
     int didHardAck = 0;
-    register unsigned int t;
+    unsigned int t;
     struct rx_packet *rp;
     struct rx_packet *curp;
     struct iovec *call_iov;
@@ -432,8 +438,14 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                rp = queue_First(&call->rq, rx_packet);
                if (rp->header.seq == call->rnext) {
                    afs_int32 error;
-                   register struct rx_connection *conn = call->conn;
+                   struct rx_connection *conn = call->conn;
                    queue_Remove(rp);
+#ifdef RX_TRACK_PACKETS
+                   rp->flags &= ~RX_PKTFLAG_RQ;
+#endif
+#ifdef RXDEBUG_PACKET
+                    call->rqc--;
+#endif /* RXDEBUG_PACKET */
 
                    /* RXS_CheckPacket called to undo RXS_PreparePacket's
                     * work.  It may reduce the length of the packet by up
@@ -441,8 +453,8 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                     * data + the header. */
                    if ((error =
                         RXS_CheckPacket(conn->securityObject, call, rp))) {
-                       /* Used to merely shut down the call, but now we 
-                        * shut down the whole connection since this may 
+                       /* Used to merely shut down the call, but now we
+                        * shut down the whole connection since this may
                         * indicate an attempt to hijack it */
 
                        MUTEX_EXIT(&call->lock);
@@ -457,9 +469,12 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                    }
                    call->rnext++;
                    curp = call->currentPacket = rp;
+#ifdef RX_TRACK_PACKETS
+                   call->currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
                    call->curvec = 1;   /* 0th vec is always header */
                    cur_iov = &curp->wirevec[1];
-                   /* begin at the beginning [ more or less ], continue 
+                   /* begin at the beginning [ more or less ], continue
                     * on until the end, then stop. */
                    call->curpos =
                        (char *)curp->wirevec[1].iov_base +
@@ -511,14 +526,28 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
 
            if (!call->nLeft) {
                /* out of packet.  Get another one. */
+#ifdef RX_TRACK_PACKETS
+                curp->flags &= ~RX_PKTFLAG_CP;
+                curp->flags |= RX_PKTFLAG_IOVQ;
+#endif
                queue_Append(&call->iovq, curp);
+#ifdef RXDEBUG_PACKET
+                call->iovqc++;
+#endif /* RXDEBUG_PACKET */
                curp = call->currentPacket = (struct rx_packet *)0;
            } else if (!call->curlen) {
                /* need to get another struct iov */
                if (++call->curvec >= curp->niovecs) {
                    /* current packet is exhausted, get ready for another */
                    /* don't worry about curvec and stuff, they get set somewhere else */
+#ifdef RX_TRACK_PACKETS
+                   curp->flags &= ~RX_PKTFLAG_CP;
+                   curp->flags |= RX_PKTFLAG_IOVQ;
+#endif
                    queue_Append(&call->iovq, curp);
+#ifdef RXDEBUG_PACKET
+                    call->iovqc++;
+#endif /* RXDEBUG_PACKET */
                    curp = call->currentPacket = (struct rx_packet *)0;
                    call->nLeft = 0;
                } else {
@@ -539,8 +568,9 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
            rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
            didHardAck = 1;
        } else {
-           struct clock when;
-           clock_GetTime(&when);
+           struct clock when, now;
+           clock_GetTime(&now);
+           when = now;
            /* Delay to consolidate ack packets */
            clock_Add(&when, &rx_hardAckDelay);
            if (!call->delayedAckEvent
@@ -549,7 +579,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                               RX_CALL_REFCOUNT_DELAY);
                CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
                call->delayedAckEvent =
-                   rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+                   rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
            }
        }
     }
@@ -569,18 +599,12 @@ int
 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
              int nbytes)
 {
-    struct rx_packet *rp;
-    struct rx_packet *nxp;     /* Next packet pointer, for queue_Scan */
-    int requestCount;
-    int nextio;
-
-    requestCount = nbytes;
-    nextio = 0;
-
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-       queue_Remove(rp);
-       rxi_FreePacket(rp);
+    if (queue_IsNotEmpty(&call->iovq)) {
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode == RX_MODE_SENDING) {
@@ -652,22 +676,20 @@ rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
 
 int
-rxi_WriteProc(register struct rx_call *call, register char *buf,
-             register int nbytes)
+rxi_WriteProc(struct rx_call *call, char *buf,
+             int nbytes)
 {
     struct rx_connection *conn = call->conn;
-    register struct rx_packet *cp = call->currentPacket;
-    register struct rx_packet *tp;     /* Temporary packet pointer */
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
-    register unsigned int t;
+    struct rx_packet *cp = call->currentPacket;
+    unsigned int t;
     int requestCount = nbytes;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    if (!queue_IsEmpty(&call->iovq)) {
-       for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-           queue_Remove(tp);
-           rxi_FreePacket(tp);
-       }
+    if (queue_IsNotEmpty(&call->iovq)) {
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode != RX_MODE_SENDING) {
@@ -675,6 +697,9 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
            && (call->mode == RX_MODE_RECEIVING)) {
            call->mode = RX_MODE_SENDING;
            if (cp) {
+#ifdef RX_TRACK_PACKETS
+               cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                rxi_FreePacket(cp);
                cp = call->currentPacket = (struct rx_packet *)0;
                call->nLeft = 0;
@@ -692,38 +717,63 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
     do {
        if (call->nFree == 0) {
            if (!call->error && cp) {
+                /* Clear the current packet now so that if
+                 * we are forced to wait and drop the lock
+                 * the packet we are planning on using
+                 * cannot be freed.
+                 */
+#ifdef RX_TRACK_PACKETS
+                cp->flags &= ~RX_PKTFLAG_CP;
+#endif
+               call->currentPacket = (struct rx_packet *)0;
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
                /* Wait until TQ_BUSY is reset before adding any
                 * packets to the transmit queue
                 */
                while (call->flags & RX_CALL_TQ_BUSY) {
                    call->flags |= RX_CALL_TQ_WAIT;
+                    call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
                    CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
                    osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+                    call->tqWaiters--;
+                    if (call->tqWaiters == 0)
+                        call->flags &= ~RX_CALL_TQ_WAIT;
                }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
                clock_NewTime();        /* Bogus:  need new time package */
-               /* The 0, below, specifies that it is not the last packet: 
+               /* The 0, below, specifies that it is not the last packet:
                 * there will be others. PrepareSendPacket may
                 * alter the packet length by up to
                 * conn->securityMaxTrailerSize */
                hadd32(call->bytesSent, cp->length);
                rxi_PrepareSendPacket(call, cp, 0);
+#ifdef RX_TRACK_PACKETS
+               cp->flags |= RX_PKTFLAG_TQ;
+#endif
                queue_Append(&call->tq, cp);
-               cp = call->currentPacket = NULL;
+#ifdef RXDEBUG_PACKET
+                call->tqc++;
+#endif /* RXDEBUG_PACKET */
+                cp = (struct rx_packet *)0;
                if (!
                    (call->
                     flags & (RX_CALL_FAST_RECOVER |
                              RX_CALL_FAST_RECOVER_WAIT))) {
                    rxi_Start(0, call, 0, 0);
                }
+           } else if (cp) {
+#ifdef RX_TRACK_PACKETS
+               cp->flags &= ~RX_PKTFLAG_CP;
+#endif
+               rxi_FreePacket(cp);
+               cp = call->currentPacket = (struct rx_packet *)0;
            }
            /* Wait for transmit window to open up */
            while (!call->error
-                  && call->tnext + 1 > call->tfirst + call->twind) {
+                  && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
                clock_NewTime();
                call->startWait = clock_Sec();
 
@@ -742,10 +792,13 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
 #endif /* RX_ENABLE_LOCKS */
            }
            if ((cp = rxi_AllocSendPacket(call, nbytes))) {
+#ifdef RX_TRACK_PACKETS
+               cp->flags |= RX_PKTFLAG_CP;
+#endif
                call->currentPacket = cp;
                call->nFree = cp->length;
                call->curvec = 1;       /* 0th vec is always header */
-               /* begin at the beginning [ more or less ], continue 
+               /* begin at the beginning [ more or less ], continue
                 * on until the end, then stop. */
                call->curpos =
                    (char *)cp->wirevec[1].iov_base +
@@ -755,6 +808,9 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
            }
            if (call->error) {
                if (cp) {
+#ifdef RX_TRACK_PACKETS
+                   cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                    rxi_FreePacket(cp);
                    call->currentPacket = NULL;
                }
@@ -764,7 +820,7 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
 
        if (cp && (int)call->nFree < nbytes) {
            /* Try to extend the current buffer */
-           register int len, mud;
+           int len, mud;
            len = cp->length;
            mud = rx_MaxUserDataSize(call);
            if (mud > len) {
@@ -793,8 +849,8 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
            buf += t;
            nbytes -= t;
            call->curpos += t;
-           call->curlen -= t;
-           call->nFree -= t;
+           call->curlen -= (u_short)t;
+           call->nFree -= (u_short)t;
 
            if (!call->curlen) {
                /* need to get another struct iov */
@@ -826,37 +882,27 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
     char *tcurpos;
     SPLVAR;
 
-    /*
-     * Free any packets from the last call to ReadvProc/WritevProc.
-     * We do not need the lock because the receiver threads only
-     * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
-     * RX_CALL_IOVEC_WAIT is always cleared before returning from
-     * ReadvProc/WritevProc.
-     */
-    if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+    /* Free any packets from the last call to ReadvProc/WritevProc */
+    if (queue_IsNotEmpty(&call->iovq)) {
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     /*
      * Most common case: all of the data fits in the current iovec.
-     * We do not need the lock because this is the only thread that
-     * updates the curlen, curpos, nFree fields.
-     *
      * We are relying on nFree being zero unless the call is in send mode.
      */
     tcurlen = (int)call->curlen;
     tnFree = (int)call->nFree;
     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
        tcurpos = call->curpos;
+
        memcpy(tcurpos, buf, nbytes);
        call->curpos = tcurpos + nbytes;
-       call->curlen = tcurlen - nbytes;
-       call->nFree = tnFree - nbytes;
+       call->curlen = (u_short)(tcurlen - nbytes);
+       call->nFree = (u_short)(tnFree - nbytes);
        return nbytes;
     }
 
@@ -870,7 +916,7 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
 
 /* Optimization for marshalling 32 bit arguments */
 int
-rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
+rx_WriteProc32(struct rx_call *call, afs_int32 * value)
 {
     int bytes;
     int tcurlen;
@@ -878,42 +924,31 @@ rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
     char *tcurpos;
     SPLVAR;
 
-    /*
-     * Free any packets from the last call to ReadvProc/WritevProc.
-     * We do not need the lock because the receiver threads only
-     * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
-     * RX_CALL_IOVEC_WAIT is always cleared before returning from
-     * ReadvProc/WritevProc.
-     */
-    if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+    if (queue_IsNotEmpty(&call->iovq)) {
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     /*
      * Most common case: all of the data fits in the current iovec.
-     * We do not need the lock because this is the only thread that
-     * updates the curlen, curpos, nFree fields.
-     *
      * We are relying on nFree being zero unless the call is in send mode.
      */
-    tcurlen = (int)call->curlen;
-    tnFree = (int)call->nFree;
+    tcurlen = call->curlen;
+    tnFree = call->nFree;
     if (!call->error && tcurlen >= sizeof(afs_int32)
        && tnFree >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
-       if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
+
+       if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
            *((afs_int32 *) (tcurpos)) = *value;
        } else {
            memcpy(tcurpos, (char *)value, sizeof(afs_int32));
        }
        call->curpos = tcurpos + sizeof(afs_int32);
-       call->curlen = tcurlen - sizeof(afs_int32);
-       call->nFree = tnFree - sizeof(afs_int32);
+       call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
+       call->nFree = (u_short)(tnFree - sizeof(afs_int32));
        return sizeof(afs_int32);
     }
 
@@ -938,13 +973,11 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
 {
     struct rx_connection *conn = call->conn;
     struct rx_packet *cp = call->currentPacket;
-    struct rx_packet *tp;      /* temporary packet pointer */
-    struct rx_packet *nxp;     /* Next packet pointer, for queue_Scan */
     int requestCount;
     int nextio;
     /* Temporary values, real work is done in rxi_WritevProc */
     int tnFree;
-    int tcurvec;
+    unsigned int tcurvec;
     char *tcurpos;
     int tcurlen;
 
@@ -952,9 +985,11 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     nextio = 0;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-       queue_Remove(tp);
-       rxi_FreePacket(tp);
+    if (queue_IsNotEmpty(&call->iovq)) {
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode != RX_MODE_SENDING) {
@@ -962,6 +997,9 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
            && (call->mode == RX_MODE_RECEIVING)) {
            call->mode = RX_MODE_SENDING;
            if (cp) {
+#ifdef RX_TRACK_PACKETS
+               cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                rxi_FreePacket(cp);
                cp = call->currentPacket = (struct rx_packet *)0;
                call->nLeft = 0;
@@ -978,7 +1016,7 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     tcurpos = call->curpos;
     tcurlen = call->curlen;
     do {
-       register unsigned int t;
+       int t;
 
        if (tnFree == 0) {
            /* current packet is full, allocate a new one */
@@ -988,7 +1026,13 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
                *nio = nextio;
                return requestCount - nbytes;
            }
+#ifdef RX_TRACK_PACKETS
+           cp->flags |= RX_PKTFLAG_IOVQ;
+#endif
            queue_Append(&call->iovq, cp);
+#ifdef RXDEBUG_PACKET
+            call->iovqc++;
+#endif /* RXDEBUG_PACKET */
            tnFree = cp->length;
            tcurvec = 1;
            tcurpos =
@@ -999,7 +1043,7 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
 
        if (tnFree < nbytes) {
            /* try to extend the current packet */
-           register int len, mud;
+           int len, mud;
            len = cp->length;
            mud = rx_MaxUserDataSize(call);
            if (mud > len) {
@@ -1065,12 +1109,16 @@ rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
 int
 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 {
-    struct rx_packet *cp = call->currentPacket;
-    register struct rx_packet *tp;     /* Temporary packet pointer */
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
+    struct rx_packet *cp = NULL;
+#ifdef RX_TRACK_PACKETS
+    struct rx_packet *p, *np;
+#endif
     int nextio;
     int requestCount;
     struct rx_queue tmpq;
+#ifdef RXDEBUG_PACKET
+    u_short tmpqc;
+#endif
 
     requestCount = nbytes;
     nextio = 0;
@@ -1083,23 +1131,36 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
      * packets to the transmit queue.  */
     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
        call->flags |= RX_CALL_TQ_WAIT;
+        call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
        CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
        osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+        call->tqWaiters--;
+        if (call->tqWaiters == 0)
+            call->flags &= ~RX_CALL_TQ_WAIT;
     }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+    /* cp is no longer valid since we may have given up the lock */
+    cp = call->currentPacket;
 
     if (call->error) {
-       for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-           queue_Remove(tp);
-           rxi_FreePacket(tp);
-       }
        if (cp) {
-           rxi_FreePacket(cp);
-           cp = call->currentPacket = NULL;
+#ifdef RX_TRACK_PACKETS
+            cp->flags &= ~RX_PKTFLAG_CP;
+            cp->flags |= RX_PKTFLAG_IOVQ;
+#endif
+           queue_Prepend(&call->iovq, cp);
+#ifdef RXDEBUG_PACKET
+            call->iovqc++;
+#endif /* RXDEBUG_PACKET */
+           cp = call->currentPacket = (struct rx_packet *)0;
        }
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
        return 0;
     }
 
@@ -1110,30 +1171,45 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
      * a zero length write will push a short packet. */
     nextio = 0;
     queue_Init(&tmpq);
+#ifdef RXDEBUG_PACKET
+    tmpqc = 0;
+#endif /* RXDEBUG_PACKET */
     do {
        if (call->nFree == 0 && cp) {
            clock_NewTime();    /* Bogus:  need new time package */
-           /* The 0, below, specifies that it is not the last packet: 
+           /* The 0, below, specifies that it is not the last packet:
             * there will be others. PrepareSendPacket may
             * alter the packet length by up to
             * conn->securityMaxTrailerSize */
            hadd32(call->bytesSent, cp->length);
            rxi_PrepareSendPacket(call, cp, 0);
            queue_Append(&tmpq, cp);
+#ifdef RXDEBUG_PACKET
+            tmpqc++;
+#endif /* RXDEBUG_PACKET */
+            cp = call->currentPacket = (struct rx_packet *)0;
 
            /* The head of the iovq is now the current packet */
            if (nbytes) {
                if (queue_IsEmpty(&call->iovq)) {
                    call->error = RX_PROTOCOL_ERROR;
-                   cp = call->currentPacket = NULL;
-                   for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
-                       queue_Remove(tp);
-                       rxi_FreePacket(tp);
-                   }
+#ifdef RXDEBUG_PACKET
+                    tmpqc -=
+#endif /* RXDEBUG_PACKET */
+                        rxi_FreePackets(0, &tmpq);
                    return 0;
                }
                cp = queue_First(&call->iovq, rx_packet);
                queue_Remove(cp);
+#ifdef RX_TRACK_PACKETS
+                cp->flags &= ~RX_PKTFLAG_IOVQ;
+#endif
+#ifdef RXDEBUG_PACKET
+                call->iovqc--;
+#endif /* RXDEBUG_PACKET */
+#ifdef RX_TRACK_PACKETS
+                cp->flags |= RX_PKTFLAG_CP;
+#endif
                call->currentPacket = cp;
                call->nFree = cp->length;
                call->curvec = 1;
@@ -1150,14 +1226,20 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
            if (iov[nextio].iov_base != call->curpos
                || iov[nextio].iov_len > (int)call->curlen) {
                call->error = RX_PROTOCOL_ERROR;
-               for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
-                   queue_Remove(tp);
-                   rxi_FreePacket(tp);
-               }
                if (cp) {
-                   rxi_FreePacket(cp);
-                   call->currentPacket = NULL;
+#ifdef RX_TRACK_PACKETS
+                   cp->flags &= ~RX_PKTFLAG_CP;
+#endif
+                    queue_Prepend(&tmpq, cp);
+#ifdef RXDEBUG_PACKET
+                    tmpqc++;
+#endif /* RXDEBUG_PACKET */
+                    cp = call->currentPacket = (struct rx_packet *)0;
                }
+#ifdef RXDEBUG_PACKET
+                tmpqc -=
+#endif /* RXDEBUG_PACKET */
+                    rxi_FreePackets(0, &tmpq);
                return 0;
            }
            nbytes -= iov[nextio].iov_len;
@@ -1178,17 +1260,21 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 
     /* Move the packets from the temporary queue onto the transmit queue.
      * We may end up with more than call->twind packets on the queue. */
-    for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
-       queue_Remove(tp);
-       queue_Append(&call->tq, tp);
+
+#ifdef RX_TRACK_PACKETS
+    for (queue_Scan(&tmpq, p, np, rx_packet))
+    {
+        p->flags |= RX_PKTFLAG_TQ;
     }
+#endif
+    queue_SpliceAppend(&call->tq, &tmpq);
 
     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
        rxi_Start(0, call, 0, 0);
     }
 
     /* Wait for the length of the transmit queue to fall below call->twind */
-    while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
+    while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
        clock_NewTime();
        call->startWait = clock_Sec();
 #ifdef RX_ENABLE_LOCKS
@@ -1199,11 +1285,16 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 #endif
        call->startWait = 0;
     }
+    /* cp is no longer valid since we may have given up the lock */
+    cp = call->currentPacket;
 
     if (call->error) {
        if (cp) {
+#ifdef RX_TRACK_PACKETS
+           cp->flags &= ~RX_PKTFLAG_CP;
+#endif
            rxi_FreePacket(cp);
-           cp = call->currentPacket = NULL;
+            cp = call->currentPacket = (struct rx_packet *)0;
        }
        return 0;
     }
@@ -1228,16 +1319,16 @@ rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 /* Flush any buffered data to the stream, switch to read mode
  * (clients) or to EOF mode (servers) */
 void
-rxi_FlushWrite(register struct rx_call *call)
+rxi_FlushWrite(struct rx_call *call)
 {
-    register struct rx_packet *cp = call->currentPacket;
-    register struct rx_packet *tp;     /* Temporary packet pointer */
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
+    struct rx_packet *cp = NULL;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-       queue_Remove(tp);
-       rxi_FreePacket(tp);
+    if (queue_IsNotEmpty(&call->iovq)) {
+#ifdef RXDEBUG_PACKET
+        call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+            rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode == RX_MODE_SENDING) {
@@ -1265,18 +1356,28 @@ rxi_FlushWrite(register struct rx_call *call)
         */
        while (call->flags & RX_CALL_TQ_BUSY) {
            call->flags |= RX_CALL_TQ_WAIT;
+            call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
            CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
            osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+            call->tqWaiters--;
+            if (call->tqWaiters == 0)
+                call->flags &= ~RX_CALL_TQ_WAIT;
        }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
+        /* cp is no longer valid since we may have given up the lock */
+        cp = call->currentPacket;
+
        if (cp) {
            /* cp->length is only supposed to be the user's data */
-           /* cp->length was already set to (then-current) 
+           /* cp->length was already set to (then-current)
             * MaxUserDataSize or less. */
+#ifdef RX_TRACK_PACKETS
+           cp->flags &= ~RX_PKTFLAG_CP;
+#endif
            cp->length -= call->nFree;
            call->currentPacket = (struct rx_packet *)0;
            call->nFree = 0;
@@ -1294,7 +1395,13 @@ rxi_FlushWrite(register struct rx_call *call)
        /* The 1 specifies that this is the last packet */
        hadd32(call->bytesSent, cp->length);
        rxi_PrepareSendPacket(call, cp, 1);
+#ifdef RX_TRACK_PACKETS
+       cp->flags |= RX_PKTFLAG_TQ;
+#endif
        queue_Append(&call->tq, cp);
+#ifdef RXDEBUG_PACKET
+        call->tqc++;
+#endif /* RXDEBUG_PACKET */
        if (!
            (call->
             flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {