RX: Tidy reader data locking
[openafs.git] / src / rx / rx_rdwr.c
index 7da4ca6..27acf1b 100644 (file)
@@ -137,7 +137,9 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                        afs_int32 error;
                        struct rx_connection *conn = call->conn;
                        queue_Remove(rp);
+#ifdef RX_TRACK_PACKETS
                        rp->flags &= ~RX_PKTFLAG_RQ;
+#endif
 #ifdef RXDEBUG_PACKET
                         call->rqc--;
 #endif /* RXDEBUG_PACKET */
@@ -165,7 +167,9 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                        }
                        call->rnext++;
                        cp = call->currentPacket = rp;
+#ifdef RX_TRACK_PACKETS
                        call->currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
                        call->curvec = 1;       /* 0th vec is always header */
                        /* begin at the beginning [ more or less ], continue
                         * on until the end, then stop. */
@@ -242,7 +246,6 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                    osi_rxSleep(&call->rq);
 #endif
                }
-                /* cp is no longer valid since we may have given up the lock */
                 cp = call->currentPacket;
 
                call->startWait = 0;
@@ -274,7 +277,9 @@ rxi_ReadProc(struct rx_call *call, char *buf,
 
                if (!call->nLeft) {
                    /* out of packet.  Get another one. */
+#ifdef RX_TRACK_PACKETS
                    call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
                    rxi_FreePacket(cp);
                    cp = call->currentPacket = (struct rx_packet *)0;
                } else if (!call->curlen) {
@@ -282,7 +287,9 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                    if (++call->curvec >= cp->niovecs) {
                        /* current packet is exhausted, get ready for another */
                        /* don't worry about curvec and stuff, they get set somewhere else */
+#ifdef RX_TRACK_PACKETS
                        call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
                        rxi_FreePacket(cp);
                        cp = call->currentPacket = (struct rx_packet *)0;
                        call->nLeft = 0;
@@ -313,8 +320,6 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
     SPLVAR;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (!queue_IsEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -330,7 +335,8 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
     tnLeft = call->nLeft;
     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
        tcurpos = call->curpos;
-       memcpy(buf, tcurpos, nbytes);
+        memcpy(buf, tcurpos, nbytes);
+
        call->curpos = tcurpos + nbytes;
        call->curlen = tcurlen - nbytes;
        call->nLeft = tnLeft - nbytes;
@@ -340,10 +346,12 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
             rxi_FreePacket(call->currentPacket);
             call->currentPacket = (struct rx_packet *)0;
         }
-       bytes = nbytes;
-    } else
-        bytes = rxi_ReadProc(call, buf, nbytes);
+       return nbytes;
+    }
 
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_ReadProc(call, buf, nbytes);
     MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
@@ -360,8 +368,6 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
     SPLVAR;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (!queue_IsEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -378,8 +384,10 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
     if (!call->error && tcurlen >= sizeof(afs_int32)
        && tnLeft >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
-       memcpy((char *)value, tcurpos, sizeof(afs_int32));
-       call->curpos = tcurpos + sizeof(afs_int32);
+
+        memcpy((char *)value, tcurpos, sizeof(afs_int32));
+
+        call->curpos = tcurpos + sizeof(afs_int32);
        call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
        call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
         if (!call->nLeft && call->currentPacket != NULL) {
@@ -387,12 +395,15 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
             rxi_FreePacket(call->currentPacket);
             call->currentPacket = (struct rx_packet *)0;
         }
-       bytes = sizeof(afs_int32);
-    } else
-        bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
+       return sizeof(afs_int32);
+    }
 
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
     MUTEX_EXIT(&call->lock);
     USERPRI;
+
     return bytes;
 }
 
@@ -429,7 +440,9 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                    afs_int32 error;
                    struct rx_connection *conn = call->conn;
                    queue_Remove(rp);
+#ifdef RX_TRACK_PACKETS
                    rp->flags &= ~RX_PKTFLAG_RQ;
+#endif
 #ifdef RXDEBUG_PACKET
                     call->rqc--;
 #endif /* RXDEBUG_PACKET */
@@ -456,7 +469,9 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                    }
                    call->rnext++;
                    curp = call->currentPacket = rp;
+#ifdef RX_TRACK_PACKETS
                    call->currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
                    call->curvec = 1;   /* 0th vec is always header */
                    cur_iov = &curp->wirevec[1];
                    /* begin at the beginning [ more or less ], continue
@@ -511,8 +526,10 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
 
            if (!call->nLeft) {
                /* out of packet.  Get another one. */
+#ifdef RX_TRACK_PACKETS
                 curp->flags &= ~RX_PKTFLAG_CP;
                 curp->flags |= RX_PKTFLAG_IOVQ;
+#endif
                queue_Append(&call->iovq, curp);
 #ifdef RXDEBUG_PACKET
                 call->iovqc++;
@@ -523,8 +540,10 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                if (++call->curvec >= curp->niovecs) {
                    /* current packet is exhausted, get ready for another */
                    /* don't worry about curvec and stuff, they get set somewhere else */
+#ifdef RX_TRACK_PACKETS
                    curp->flags &= ~RX_PKTFLAG_CP;
                    curp->flags |= RX_PKTFLAG_IOVQ;
+#endif
                    queue_Append(&call->iovq, curp);
 #ifdef RXDEBUG_PACKET
                     call->iovqc++;
@@ -580,12 +599,6 @@ int
 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
              int nbytes)
 {
-    int requestCount;
-    int nextio;
-
-    requestCount = nbytes;
-    nextio = 0;
-
     /* Free any packets from the last call to ReadvProc/WritevProc */
     if (queue_IsNotEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
@@ -684,7 +697,9 @@ rxi_WriteProc(struct rx_call *call, char *buf,
            && (call->mode == RX_MODE_RECEIVING)) {
            call->mode = RX_MODE_SENDING;
            if (cp) {
+#ifdef RX_TRACK_PACKETS
                cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                rxi_FreePacket(cp);
                cp = call->currentPacket = (struct rx_packet *)0;
                call->nLeft = 0;
@@ -707,7 +722,9 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                  * the packet we are planning on using
                  * cannot be freed.
                  */
+#ifdef RX_TRACK_PACKETS
                 cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                call->currentPacket = (struct rx_packet *)0;
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
                /* Wait until TQ_BUSY is reset before adding any
@@ -733,7 +750,9 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                 * conn->securityMaxTrailerSize */
                hadd32(call->bytesSent, cp->length);
                rxi_PrepareSendPacket(call, cp, 0);
+#ifdef RX_TRACK_PACKETS
                cp->flags |= RX_PKTFLAG_TQ;
+#endif
                queue_Append(&call->tq, cp);
 #ifdef RXDEBUG_PACKET
                 call->tqc++;
@@ -746,7 +765,9 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                    rxi_Start(0, call, 0, 0);
                }
            } else if (cp) {
+#ifdef RX_TRACK_PACKETS
                cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                rxi_FreePacket(cp);
                cp = call->currentPacket = (struct rx_packet *)0;
            }
@@ -771,7 +792,9 @@ rxi_WriteProc(struct rx_call *call, char *buf,
 #endif /* RX_ENABLE_LOCKS */
            }
            if ((cp = rxi_AllocSendPacket(call, nbytes))) {
+#ifdef RX_TRACK_PACKETS
                cp->flags |= RX_PKTFLAG_CP;
+#endif
                call->currentPacket = cp;
                call->nFree = cp->length;
                call->curvec = 1;       /* 0th vec is always header */
@@ -785,7 +808,9 @@ rxi_WriteProc(struct rx_call *call, char *buf,
            }
            if (call->error) {
                if (cp) {
+#ifdef RX_TRACK_PACKETS
                    cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                    rxi_FreePacket(cp);
                    call->currentPacket = NULL;
                }
@@ -858,8 +883,6 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
     SPLVAR;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (queue_IsNotEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -875,14 +898,17 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
     tnFree = (int)call->nFree;
     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
        tcurpos = call->curpos;
+
        memcpy(tcurpos, buf, nbytes);
        call->curpos = tcurpos + nbytes;
        call->curlen = (u_short)(tcurlen - nbytes);
        call->nFree = (u_short)(tnFree - nbytes);
-       bytes = nbytes;
-    } else
-        bytes = rxi_WriteProc(call, buf, nbytes);
+       return nbytes;
+    }
 
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_WriteProc(call, buf, nbytes);
     MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
@@ -898,9 +924,6 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
     char *tcurpos;
     SPLVAR;
 
-    /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (queue_IsNotEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -917,6 +940,7 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
     if (!call->error && tcurlen >= sizeof(afs_int32)
        && tnFree >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
+
        if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
            *((afs_int32 *) (tcurpos)) = *value;
        } else {
@@ -925,10 +949,12 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
        call->curpos = tcurpos + sizeof(afs_int32);
        call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
        call->nFree = (u_short)(tnFree - sizeof(afs_int32));
-       bytes = sizeof(afs_int32);
-    } else
-        bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
+       return sizeof(afs_int32);
+    }
 
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
     MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
@@ -971,7 +997,9 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
            && (call->mode == RX_MODE_RECEIVING)) {
            call->mode = RX_MODE_SENDING;
            if (cp) {
+#ifdef RX_TRACK_PACKETS
                cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                rxi_FreePacket(cp);
                cp = call->currentPacket = (struct rx_packet *)0;
                call->nLeft = 0;
@@ -998,7 +1026,9 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
                *nio = nextio;
                return requestCount - nbytes;
            }
+#ifdef RX_TRACK_PACKETS
            cp->flags |= RX_PKTFLAG_IOVQ;
+#endif
            queue_Append(&call->iovq, cp);
 #ifdef RXDEBUG_PACKET
             call->iovqc++;
@@ -1080,7 +1110,9 @@ int
 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 {
     struct rx_packet *cp = NULL;
+#ifdef RX_TRACK_PACKETS
     struct rx_packet *p, *np;
+#endif
     int nextio;
     int requestCount;
     struct rx_queue tmpq;
@@ -1115,8 +1147,10 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 
     if (call->error) {
        if (cp) {
+#ifdef RX_TRACK_PACKETS
             cp->flags &= ~RX_PKTFLAG_CP;
             cp->flags |= RX_PKTFLAG_IOVQ;
+#endif
            queue_Prepend(&call->iovq, cp);
 #ifdef RXDEBUG_PACKET
             call->iovqc++;
@@ -1167,11 +1201,15 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
                }
                cp = queue_First(&call->iovq, rx_packet);
                queue_Remove(cp);
+#ifdef RX_TRACK_PACKETS
                 cp->flags &= ~RX_PKTFLAG_IOVQ;
+#endif
 #ifdef RXDEBUG_PACKET
                 call->iovqc--;
 #endif /* RXDEBUG_PACKET */
+#ifdef RX_TRACK_PACKETS
                 cp->flags |= RX_PKTFLAG_CP;
+#endif
                call->currentPacket = cp;
                call->nFree = cp->length;
                call->curvec = 1;
@@ -1189,7 +1227,9 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
                || iov[nextio].iov_len > (int)call->curlen) {
                call->error = RX_PROTOCOL_ERROR;
                if (cp) {
+#ifdef RX_TRACK_PACKETS
                    cp->flags &= ~RX_PKTFLAG_CP;
+#endif
                     queue_Prepend(&tmpq, cp);
 #ifdef RXDEBUG_PACKET
                     tmpqc++;
@@ -1221,10 +1261,12 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
     /* Move the packets from the temporary queue onto the transmit queue.
      * We may end up with more than call->twind packets on the queue. */
 
+#ifdef RX_TRACK_PACKETS
     for (queue_Scan(&tmpq, p, np, rx_packet))
     {
         p->flags |= RX_PKTFLAG_TQ;
     }
+#endif
     queue_SpliceAppend(&call->tq, &tmpq);
 
     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
@@ -1248,7 +1290,9 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 
     if (call->error) {
        if (cp) {
+#ifdef RX_TRACK_PACKETS
            cp->flags &= ~RX_PKTFLAG_CP;
+#endif
            rxi_FreePacket(cp);
             cp = call->currentPacket = (struct rx_packet *)0;
        }
@@ -1331,7 +1375,9 @@ rxi_FlushWrite(struct rx_call *call)
            /* cp->length is only supposed to be the user's data */
            /* cp->length was already set to (then-current)
             * MaxUserDataSize or less. */
+#ifdef RX_TRACK_PACKETS
            cp->flags &= ~RX_PKTFLAG_CP;
+#endif
            cp->length -= call->nFree;
            call->currentPacket = (struct rx_packet *)0;
            call->nFree = 0;
@@ -1349,7 +1395,9 @@ rxi_FlushWrite(struct rx_call *call)
        /* The 1 specifies that this is the last packet */
        hadd32(call->bytesSent, cp->length);
        rxi_PrepareSendPacket(call, cp, 1);
+#ifdef RX_TRACK_PACKETS
        cp->flags |= RX_PKTFLAG_TQ;
+#endif
        queue_Append(&call->tq, cp);
 #ifdef RXDEBUG_PACKET
         call->tqc++;