rx-readproc32-avoid-losing-currentpacket-20080925
[openafs.git] / src / rx / rx_rdwr.c
index 449289f..5baff8a 100644 (file)
@@ -28,6 +28,9 @@ RCSID
 #include "h/types.h"
 #include "h/time.h"
 #include "h/stat.h"
+#if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
+#include "h/systm.h"
+#endif
 #ifdef AFS_OSF_ENV
 #include <net/net_globals.h>
 #endif /* AFS_OSF_ENV */
@@ -59,30 +62,26 @@ RCSID
 #include "rx/rx_globals.h"
 #include "afs/lock.h"
 #include "afsint.h"
-#ifdef  AFS_ALPHA_ENV
+#ifdef  AFS_OSF_ENV
 #undef kmem_alloc
 #undef kmem_free
 #undef mem_alloc
 #undef mem_free
 #undef register
-#endif /* AFS_ALPHA_ENV */
+#endif /* AFS_OSF_ENV */
 #else /* KERNEL */
 # include <sys/types.h>
-#ifndef AFS_NT40_ENV
+#ifdef AFS_NT40_ENV
+# include <winsock2.h>
+#else /* !AFS_NT40_ENV */
 # include <sys/socket.h>
 # include <sys/file.h>
 # include <netdb.h>
 # include <netinet/in.h>
 # include <sys/stat.h>
 # include <sys/time.h>
-#endif
-#ifdef HAVE_STRING_H
+#endif /* !AFS_NT40_ENV */
 #include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
@@ -107,18 +106,15 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
 {
     register struct rx_packet *cp = call->currentPacket;
     register struct rx_packet *rp;
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
     register int requestCount;
     register unsigned int t;
+
 /* XXXX took out clock_NewTime from here.  Was it needed? */
     requestCount = nbytes;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    if (!queue_IsEmpty(&call->iovq)) {
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+    if (queue_IsNotEmpty(&call->iovq)) {
+        rxi_FreePackets(0, &call->iovq);
     }
 
     do {
@@ -196,8 +192,9 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                                               RX_CALL_REFCOUNT_DELAY);
                                rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
                            } else {
-                               struct clock when;
-                               clock_GetTime(&when);
+                               struct clock when, now;
+                               clock_GetTime(&now);
+                               when = now;
                                /* Delay to consolidate ack packets */
                                clock_Add(&when, &rx_hardAckDelay);
                                if (!call->delayedAckEvent
@@ -208,7 +205,7 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                                                   RX_CALL_REFCOUNT_DELAY);
                                    CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
                                    call->delayedAckEvent =
-                                       rxevent_Post(&when,
+                                     rxevent_PostNow(&when, &now,
                                                     rxi_SendDelayedAck, call,
                                                     0);
                                }
@@ -310,12 +307,7 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
      * ReadvProc/WritevProc.
      */
     if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+        rxi_FreePackets(0, &call->iovq);
     }
 
     /*
@@ -333,15 +325,23 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
        call->curpos = tcurpos + nbytes;
        call->curlen = tcurlen - nbytes;
        call->nLeft = tnLeft - nbytes;
+
+        if (!call->nLeft) {
+            /* out of packet.  Get another one. */
+            NETPRI;
+            MUTEX_ENTER(&call->lock);
+            rxi_FreePacket(call->currentPacket);
+            call->currentPacket = (struct rx_packet *)0;
+            MUTEX_EXIT(&call->lock);
+            USERPRI;
+        }
        return nbytes;
     }
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     bytes = rxi_ReadProc(call, buf, nbytes);
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return bytes;
 }
@@ -364,12 +364,7 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
      * ReadvProc/WritevProc.
      */
     if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+       rxi_FreePackets(0, &call->iovq);
     }
 
     /*
@@ -381,26 +376,29 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
      */
     tcurlen = call->curlen;
     tnLeft = call->nLeft;
-    if (!call->error && tcurlen > sizeof(afs_int32)
-       && tnLeft > sizeof(afs_int32)) {
+    if (!call->error && tcurlen >= sizeof(afs_int32)
+       && tnLeft >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
-       if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
-           *value = *((afs_int32 *) (tcurpos));
-       } else {
-           memcpy((char *)value, tcurpos, sizeof(afs_int32));
-       }
+       memcpy((char *)value, tcurpos, sizeof(afs_int32));
        call->curpos = tcurpos + sizeof(afs_int32);
-       call->curlen = tcurlen - sizeof(afs_int32);
-       call->nLeft = tnLeft - sizeof(afs_int32);
+       call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
+       call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
+        if (!call->nLeft) {
+            /* out of packet.  Get another one. */
+            NETPRI;
+            MUTEX_ENTER(&call->lock);
+            rxi_FreePacket(call->currentPacket);
+            call->currentPacket = (struct rx_packet *)0;
+            MUTEX_EXIT(&call->lock);
+            USERPRI;
+        }
        return sizeof(afs_int32);
     }
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return bytes;
 }
@@ -543,8 +541,9 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
            rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
            didHardAck = 1;
        } else {
-           struct clock when;
-           clock_GetTime(&when);
+           struct clock when, now;
+           clock_GetTime(&now);
+           when = now;
            /* Delay to consolidate ack packets */
            clock_Add(&when, &rx_hardAckDelay);
            if (!call->delayedAckEvent
@@ -553,7 +552,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                               RX_CALL_REFCOUNT_DELAY);
                CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
                call->delayedAckEvent =
-                   rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+                   rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
            }
        }
     }
@@ -573,8 +572,6 @@ int
 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
              int nbytes)
 {
-    struct rx_packet *rp;
-    struct rx_packet *nxp;     /* Next packet pointer, for queue_Scan */
     int requestCount;
     int nextio;
 
@@ -582,9 +579,8 @@ rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     nextio = 0;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-       queue_Remove(rp);
-       rxi_FreePacket(rp);
+    if (queue_IsNotEmpty(&call->iovq)) {
+        rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode == RX_MODE_SENDING) {
@@ -644,11 +640,9 @@ rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     SPLVAR;
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return bytes;
 }
@@ -663,17 +657,12 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
 {
     struct rx_connection *conn = call->conn;
     register struct rx_packet *cp = call->currentPacket;
-    register struct rx_packet *tp;     /* Temporary packet pointer */
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
     register unsigned int t;
     int requestCount = nbytes;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    if (!queue_IsEmpty(&call->iovq)) {
-       for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-           queue_Remove(tp);
-           rxi_FreePacket(tp);
-       }
+    if (queue_IsNotEmpty(&call->iovq)) {
+       rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode != RX_MODE_SENDING) {
@@ -729,7 +718,7 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
            }
            /* Wait for transmit window to open up */
            while (!call->error
-                  && call->tnext + 1 > call->tfirst + call->twind) {
+                  && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
                clock_NewTime();
                call->startWait = clock_Sec();
 
@@ -799,8 +788,8 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
            buf += t;
            nbytes -= t;
            call->curpos += t;
-           call->curlen -= t;
-           call->nFree -= t;
+           call->curlen -= (u_short)t;
+           call->nFree -= (u_short)t;
 
            if (!call->curlen) {
                /* need to get another struct iov */
@@ -839,13 +828,8 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
      * RX_CALL_IOVEC_WAIT is always cleared before returning from
      * ReadvProc/WritevProc.
      */
-    if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+    if (queue_IsNotEmpty(&call->iovq)) {
+       rxi_FreePackets(0, &call->iovq);
     }
 
     /*
@@ -861,17 +845,15 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
        tcurpos = call->curpos;
        memcpy(tcurpos, buf, nbytes);
        call->curpos = tcurpos + nbytes;
-       call->curlen = tcurlen - nbytes;
-       call->nFree = tnFree - nbytes;
+       call->curlen = (u_short)(tcurlen - nbytes);
+       call->nFree = (u_short)(tnFree - nbytes);
        return nbytes;
     }
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     bytes = rxi_WriteProc(call, buf, nbytes);
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return bytes;
 }
@@ -893,13 +875,8 @@ rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
      * RX_CALL_IOVEC_WAIT is always cleared before returning from
      * ReadvProc/WritevProc.
      */
-    if (!queue_IsEmpty(&call->iovq)) {
-       register struct rx_packet *rp;
-       register struct rx_packet *nxp;
-       for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
-           queue_Remove(rp);
-           rxi_FreePacket(rp);
-       }
+    if (queue_IsNotEmpty(&call->iovq)) {
+       rxi_FreePackets(0, &call->iovq);
     }
 
     /*
@@ -909,28 +886,26 @@ rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
      *
      * We are relying on nFree being zero unless the call is in send mode.
      */
-    tcurlen = (int)call->curlen;
-    tnFree = (int)call->nFree;
+    tcurlen = call->curlen;
+    tnFree = call->nFree;
     if (!call->error && tcurlen >= sizeof(afs_int32)
        && tnFree >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
-       if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
+       if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
            *((afs_int32 *) (tcurpos)) = *value;
        } else {
            memcpy(tcurpos, (char *)value, sizeof(afs_int32));
        }
        call->curpos = tcurpos + sizeof(afs_int32);
-       call->curlen = tcurlen - sizeof(afs_int32);
-       call->nFree = tnFree - sizeof(afs_int32);
+       call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
+       call->nFree = (u_short)(tnFree - sizeof(afs_int32));
        return sizeof(afs_int32);
     }
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return bytes;
 }
@@ -948,8 +923,6 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
 {
     struct rx_connection *conn = call->conn;
     struct rx_packet *cp = call->currentPacket;
-    struct rx_packet *tp;      /* temporary packet pointer */
-    struct rx_packet *nxp;     /* Next packet pointer, for queue_Scan */
     int requestCount;
     int nextio;
     /* Temporary values, real work is done in rxi_WritevProc */
@@ -962,9 +935,8 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     nextio = 0;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-       queue_Remove(tp);
-       rxi_FreePacket(tp);
+    if (queue_IsNotEmpty(&call->iovq)) {
+        rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode != RX_MODE_SENDING) {
@@ -1059,11 +1031,9 @@ rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     SPLVAR;
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return bytes;
 }
@@ -1078,8 +1048,6 @@ int
 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 {
     struct rx_packet *cp = call->currentPacket;
-    register struct rx_packet *tp;     /* Temporary packet pointer */
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
     int nextio;
     int requestCount;
     struct rx_queue tmpq;
@@ -1104,14 +1072,11 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
     if (call->error) {
-       for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-           queue_Remove(tp);
-           rxi_FreePacket(tp);
-       }
        if (cp) {
-           rxi_FreePacket(cp);
+           queue_Prepend(&call->iovq, cp);
            cp = call->currentPacket = NULL;
        }
+       rxi_FreePackets(0, &call->iovq);
        return 0;
     }
 
@@ -1138,10 +1103,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
                if (queue_IsEmpty(&call->iovq)) {
                    call->error = RX_PROTOCOL_ERROR;
                    cp = call->currentPacket = NULL;
-                   for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
-                       queue_Remove(tp);
-                       rxi_FreePacket(tp);
-                   }
+                   rxi_FreePackets(0, &tmpq);
                    return 0;
                }
                cp = queue_First(&call->iovq, rx_packet);
@@ -1162,14 +1124,11 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
            if (iov[nextio].iov_base != call->curpos
                || iov[nextio].iov_len > (int)call->curlen) {
                call->error = RX_PROTOCOL_ERROR;
-               for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
-                   queue_Remove(tp);
-                   rxi_FreePacket(tp);
-               }
                if (cp) {
-                   rxi_FreePacket(cp);
+                   queue_Prepend(&tmpq, cp);
                    call->currentPacket = NULL;
                }
+               rxi_FreePackets(0, &tmpq);
                return 0;
            }
            nbytes -= iov[nextio].iov_len;
@@ -1190,17 +1149,14 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 
     /* Move the packets from the temporary queue onto the transmit queue.
      * We may end up with more than call->twind packets on the queue. */
-    for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
-       queue_Remove(tp);
-       queue_Append(&call->tq, tp);
-    }
+    queue_SpliceAppend(&call->tq, &tmpq);
 
     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
        rxi_Start(0, call, 0, 0);
     }
 
     /* Wait for the length of the transmit queue to fall below call->twind */
-    while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
+    while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
        clock_NewTime();
        call->startWait = clock_Sec();
 #ifdef RX_ENABLE_LOCKS
@@ -1230,11 +1186,9 @@ rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
     SPLVAR;
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     bytes = rxi_WritevProc(call, iov, nio, nbytes);
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return bytes;
 }
@@ -1245,13 +1199,10 @@ void
 rxi_FlushWrite(register struct rx_call *call)
 {
     register struct rx_packet *cp = call->currentPacket;
-    register struct rx_packet *tp;     /* Temporary packet pointer */
-    register struct rx_packet *nxp;    /* Next packet pointer, for queue_Scan */
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-       queue_Remove(tp);
-       rxi_FreePacket(tp);
+    if (queue_IsNotEmpty(&call->iovq)) {
+       rxi_FreePackets(0, &call->iovq);
     }
 
     if (call->mode == RX_MODE_SENDING) {
@@ -1324,10 +1275,8 @@ rx_FlushWrite(struct rx_call *call)
 {
     SPLVAR;
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     rxi_FlushWrite(call);
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
     USERPRI;
 }