From 7b45d62ec18726a49dcffe8ddbca8f446ca0bb3e Mon Sep 17 00:00:00 2001 From: Simon Wilkinson Date: Fri, 6 Jan 2012 10:12:26 +0000 Subject: [PATCH 1/1] rx: Tidy up currentPacket handling Instead of making a copy of the call->currentPacket variable in our read/write routines, reference it directly. Make it clear that currentPacket is used solely by the application thread, and remove a number of mistaken comments that suggest otherwise. Change-Id: I7ad799acbf110422df7c1e18ab552caf26b7766a Reviewed-on: http://gerrit.openafs.org/7194 Tested-by: BuildBot Reviewed-by: Jeffrey Altman Reviewed-by: Derrick Brashear --- src/rx/rx_rdwr.c | 229 ++++++++++++++++++++++++++----------------------------- 1 file changed, 110 insertions(+), 119 deletions(-) diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index a7fce0c..a711572 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -77,7 +77,6 @@ int rxi_ReadProc(struct rx_call *call, char *buf, int nbytes) { - struct rx_packet *cp = call->currentPacket; struct rx_packet *rp; int requestCount; unsigned int t; @@ -146,7 +145,7 @@ rxi_ReadProc(struct rx_call *call, char *buf, return 0; } call->rnext++; - cp = call->currentPacket = rp; + call->currentPacket = rp; #ifdef RX_TRACK_PACKETS call->currentPacket->flags |= RX_PKTFLAG_CP; #endif @@ -154,10 +153,10 @@ rxi_ReadProc(struct rx_call *call, char *buf, /* begin at the beginning [ more or less ], continue * on until the end, then stop. */ call->curpos = - (char *)cp->wirevec[1].iov_base + + (char *) call->currentPacket->wirevec[1].iov_base + call->conn->securityHeaderSize; call->curlen = - cp->wirevec[1].iov_len - + call->currentPacket->wirevec[1].iov_len - call->conn->securityHeaderSize; /* Notice that this code works correctly if the data @@ -167,8 +166,8 @@ rxi_ReadProc(struct rx_call *call, char *buf, * (no yields, etc.). If it didn't, this would be a * problem because a value of zero for call->nLeft * normally means that there is no read packet */ - call->nLeft = cp->length; - hadd32(call->bytesRcvd, cp->length); + call->nLeft = call->currentPacket->length; + hadd32(call->bytesRcvd, call->currentPacket->length); /* Send a hard ack for every rxi_HardAckRate+1 packets * consumed. Otherwise schedule an event to send @@ -213,7 +212,6 @@ rxi_ReadProc(struct rx_call *call, char *buf, osi_rxSleep(&call->rq); #endif } - cp = call->currentPacket; call->startWait = 0; #ifdef RX_ENABLE_LOCKS @@ -234,7 +232,7 @@ rxi_ReadProc(struct rx_call *call, char *buf, * number of bytes read in the length field of the packet struct. On * the final portion of a received packet, it's almost certain that * call->nLeft will be smaller than the final buffer. */ - while (nbytes && cp) { + while (nbytes && call->currentPacket) { t = MIN((int)call->curlen, nbytes); t = MIN(t, (int)call->nLeft); memcpy(buf, call->curpos, t); @@ -249,23 +247,24 @@ rxi_ReadProc(struct rx_call *call, char *buf, #ifdef RX_TRACK_PACKETS call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - rxi_FreePacket(cp); - cp = call->currentPacket = (struct rx_packet *)0; + rxi_FreePacket(call->currentPacket); + call->currentPacket = NULL; } else if (!call->curlen) { /* need to get another struct iov */ - if (++call->curvec >= cp->niovecs) { + if (++call->curvec >= call->currentPacket->niovecs) { /* current packet is exhausted, get ready for another */ /* don't worry about curvec and stuff, they get set somewhere else */ #ifdef RX_TRACK_PACKETS call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - rxi_FreePacket(cp); - cp = call->currentPacket = (struct rx_packet *)0; + rxi_FreePacket(call->currentPacket); + call->currentPacket = NULL; call->nLeft = 0; } else { call->curpos = - (char *)cp->wirevec[call->curvec].iov_base; - call->curlen = cp->wirevec[call->curvec].iov_len; + call->currentPacket->wirevec[call->curvec].iov_base; + call->curlen = + call->currentPacket->wirevec[call->curvec].iov_len; } } } @@ -374,13 +373,11 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) int didHardAck = 0; unsigned int t; struct rx_packet *rp; - struct rx_packet *curp; struct iovec *call_iov; struct iovec *cur_iov = NULL; - curp = call->currentPacket; - if (curp) { - cur_iov = &curp->wirevec[call->curvec]; + if (call->currentPacket) { + cur_iov = &call->currentPacket->wirevec[call->curvec]; } call_iov = &call->iov[call->iovNext]; @@ -422,19 +419,19 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) return 1; } call->rnext++; - curp = call->currentPacket = rp; + call->currentPacket = rp; #ifdef RX_TRACK_PACKETS call->currentPacket->flags |= RX_PKTFLAG_CP; #endif call->curvec = 1; /* 0th vec is always header */ - cur_iov = &curp->wirevec[1]; + cur_iov = &call->currentPacket->wirevec[1]; /* begin at the beginning [ more or less ], continue * on until the end, then stop. */ call->curpos = - (char *)curp->wirevec[1].iov_base + + (char *)call->currentPacket->wirevec[1].iov_base + call->conn->securityHeaderSize; call->curlen = - curp->wirevec[1].iov_len - + call->currentPacket->wirevec[1].iov_len - call->conn->securityHeaderSize; /* Notice that this code works correctly if the data @@ -444,8 +441,8 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) * (no yields, etc.). If it didn't, this would be a * problem because a value of zero for call->nLeft * normally means that there is no read packet */ - call->nLeft = curp->length; - hadd32(call->bytesRcvd, curp->length); + call->nLeft = call->currentPacket->length; + hadd32(call->bytesRcvd, call->currentPacket->length); /* Send a hard ack for every rxi_HardAckRate+1 packets * consumed. Otherwise schedule an event to send @@ -465,7 +462,9 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) * number of bytes read in the length field of the packet struct. On * the final portion of a received packet, it's almost certain that * call->nLeft will be smaller than the final buffer. */ - while (call->iovNBytes && call->iovNext < call->iovMax && curp) { + while (call->iovNBytes + && call->iovNext < call->iovMax + && call->currentPacket) { t = MIN((int)call->curlen, call->iovNBytes); t = MIN(t, (int)call->nLeft); @@ -481,28 +480,28 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) if (!call->nLeft) { /* out of packet. Get another one. */ #ifdef RX_TRACK_PACKETS - curp->flags &= ~RX_PKTFLAG_CP; - curp->flags |= RX_PKTFLAG_IOVQ; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags |= RX_PKTFLAG_IOVQ; #endif - queue_Append(&call->iovq, curp); + queue_Append(&call->iovq, call->currentPacket); #ifdef RXDEBUG_PACKET call->iovqc++; #endif /* RXDEBUG_PACKET */ - curp = call->currentPacket = (struct rx_packet *)0; + call->currentPacket = NULL; } else if (!call->curlen) { /* need to get another struct iov */ - if (++call->curvec >= curp->niovecs) { + if (++call->curvec >= call->currentPacket->niovecs) { /* current packet is exhausted, get ready for another */ /* don't worry about curvec and stuff, they get set somewhere else */ #ifdef RX_TRACK_PACKETS - curp->flags &= ~RX_PKTFLAG_CP; - curp->flags |= RX_PKTFLAG_IOVQ; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags |= RX_PKTFLAG_IOVQ; #endif - queue_Append(&call->iovq, curp); + queue_Append(&call->iovq, call->currentPacket); #ifdef RXDEBUG_PACKET call->iovqc++; #endif /* RXDEBUG_PACKET */ - curp = call->currentPacket = (struct rx_packet *)0; + call->currentPacket = NULL; call->nLeft = 0; } else { cur_iov++; @@ -629,7 +628,6 @@ rxi_WriteProc(struct rx_call *call, char *buf, int nbytes) { struct rx_connection *conn = call->conn; - struct rx_packet *cp = call->currentPacket; unsigned int t; int requestCount = nbytes; @@ -645,12 +643,12 @@ rxi_WriteProc(struct rx_call *call, char *buf, if ((conn->type == RX_SERVER_CONNECTION) && (call->mode == RX_MODE_RECEIVING)) { call->mode = RX_MODE_SENDING; - if (cp) { + if (call->currentPacket) { #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - rxi_FreePacket(cp); - cp = call->currentPacket = (struct rx_packet *)0; + rxi_FreePacket(call->currentPacket); + call->currentPacket = NULL; call->nLeft = 0; call->nFree = 0; } @@ -666,50 +664,44 @@ rxi_WriteProc(struct rx_call *call, char *buf, do { if (call->nFree == 0) { MUTEX_ENTER(&call->lock); - cp = call->currentPacket; if (call->error) call->mode = RX_MODE_ERROR; - if (!call->error && cp) { - /* Clear the current packet now so that if - * we are forced to wait and drop the lock - * the packet we are planning on using - * cannot be freed. - */ -#ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; -#endif - call->currentPacket = (struct rx_packet *)0; + if (!call->error && call->currentPacket) { clock_NewTime(); /* Bogus: need new time package */ /* The 0, below, specifies that it is not the last packet: * there will be others. PrepareSendPacket may * alter the packet length by up to * conn->securityMaxTrailerSize */ - hadd32(call->bytesSent, cp->length); - rxi_PrepareSendPacket(call, cp, 0); + hadd32(call->bytesSent, call->currentPacket->length); + rxi_PrepareSendPacket(call, call->currentPacket, 0); #ifdef AFS_GLOBAL_RXLOCK_KERNEL /* PrepareSendPacket drops the call lock */ rxi_WaitforTQBusy(call); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ #ifdef RX_TRACK_PACKETS - cp->flags |= RX_PKTFLAG_TQ; + call->currentPacket->flags |= RX_PKTFLAG_TQ; #endif - queue_Append(&call->tq, cp); + queue_Append(&call->tq, call->currentPacket); #ifdef RXDEBUG_PACKET call->tqc++; #endif /* RXDEBUG_PACKET */ - cp = (struct rx_packet *)0; +#ifdef RX_TRACK_PACKETS + call->currentPacket->flags &= ~RX_PKTFLAG_CP; +#endif + call->currentPacket = NULL; + /* If the call is in recovery, let it exhaust its current * retransmit queue before forcing it to send new packets */ if (!(call->flags & (RX_CALL_FAST_RECOVER))) { rxi_Start(call, 0); } - } else if (cp) { + } else if (call->currentPacket) { #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - rxi_FreePacket(cp); - cp = call->currentPacket = (struct rx_packet *)0; + rxi_FreePacket(call->currentPacket); + call->currentPacket = NULL; } /* Wait for transmit window to open up */ while (!call->error @@ -733,28 +725,28 @@ rxi_WriteProc(struct rx_call *call, char *buf, } #endif /* RX_ENABLE_LOCKS */ } - if ((cp = rxi_AllocSendPacket(call, nbytes))) { + if ((call->currentPacket = rxi_AllocSendPacket(call, nbytes))) { #ifdef RX_TRACK_PACKETS - cp->flags |= RX_PKTFLAG_CP; + call->currentPacket->flags |= RX_PKTFLAG_CP; #endif - call->currentPacket = cp; - call->nFree = cp->length; + call->nFree = call->currentPacket->length; call->curvec = 1; /* 0th vec is always header */ /* begin at the beginning [ more or less ], continue * on until the end, then stop. */ call->curpos = - (char *)cp->wirevec[1].iov_base + + (char *) call->currentPacket->wirevec[1].iov_base + call->conn->securityHeaderSize; call->curlen = - cp->wirevec[1].iov_len - call->conn->securityHeaderSize; + call->currentPacket->wirevec[1].iov_len - + call->conn->securityHeaderSize; } if (call->error) { call->mode = RX_MODE_ERROR; - if (cp) { + if (call->currentPacket) { #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - rxi_FreePacket(cp); + rxi_FreePacket(call->currentPacket); call->currentPacket = NULL; } MUTEX_EXIT(&call->lock); @@ -763,18 +755,19 @@ rxi_WriteProc(struct rx_call *call, char *buf, MUTEX_EXIT(&call->lock); } - if (cp && (int)call->nFree < nbytes) { + if (call->currentPacket && (int)call->nFree < nbytes) { /* Try to extend the current buffer */ int len, mud; - len = cp->length; + len = call->currentPacket->length; mud = rx_MaxUserDataSize(call); if (mud > len) { int want; want = MIN(nbytes - (int)call->nFree, mud - len); - rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF); - if (cp->length > (unsigned)mud) - cp->length = mud; - call->nFree += (cp->length - len); + rxi_AllocDataBuf(call->currentPacket, want, + RX_PACKET_CLASS_SEND_CBUF); + if (call->currentPacket->length > (unsigned)mud) + call->currentPacket->length = mud; + call->nFree += (call->currentPacket->length - len); } } @@ -782,7 +775,7 @@ rxi_WriteProc(struct rx_call *call, char *buf, * and return. Don't ship a buffer that's full immediately to * the peer--we don't know if it's the last buffer yet */ - if (!cp) { + if (!call->currentPacket) { call->nFree = 0; } @@ -799,12 +792,14 @@ rxi_WriteProc(struct rx_call *call, char *buf, if (!call->curlen) { /* need to get another struct iov */ - if (++call->curvec >= cp->niovecs) { + if (++call->curvec >= call->currentPacket->niovecs) { /* current packet is full, extend or send it */ call->nFree = 0; } else { - call->curpos = (char *)cp->wirevec[call->curvec].iov_base; - call->curlen = cp->wirevec[call->curvec].iov_len; + call->curpos = + call->currentPacket->wirevec[call->curvec].iov_base; + call->curlen = + call->currentPacket->wirevec[call->curvec].iov_len; } } } /* while bytes to send and room to send them */ @@ -914,7 +909,7 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, int nbytes) { struct rx_connection *conn = call->conn; - struct rx_packet *cp = call->currentPacket; + struct rx_packet *cp; int requestCount; int nextio; /* Temporary values, real work is done in rxi_WritevProc */ @@ -938,12 +933,12 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, if ((conn->type == RX_SERVER_CONNECTION) && (call->mode == RX_MODE_RECEIVING)) { call->mode = RX_MODE_SENDING; - if (cp) { + if (call->currentPacket) { #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - rxi_FreePacket(cp); - cp = call->currentPacket = (struct rx_packet *)0; + rxi_FreePacket(call->currentPacket); + call->currentPacket = NULL; call->nLeft = 0; call->nFree = 0; } @@ -957,6 +952,7 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, tcurvec = call->curvec; tcurpos = call->curpos; tcurlen = call->curlen; + cp = call->currentPacket; do { int t; @@ -1051,7 +1047,6 @@ rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, int rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) { - struct rx_packet *cp = NULL; #ifdef RX_TRACK_PACKETS struct rx_packet *p, *np; #endif @@ -1074,21 +1069,20 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) #ifdef AFS_GLOBAL_RXLOCK_KERNEL rxi_WaitforTQBusy(call); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ - cp = call->currentPacket; if (call->error) { call->mode = RX_MODE_ERROR; MUTEX_EXIT(&call->lock); - if (cp) { + if (call->currentPacket) { #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; - cp->flags |= RX_PKTFLAG_IOVQ; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags |= RX_PKTFLAG_IOVQ; #endif - queue_Prepend(&call->iovq, cp); + queue_Prepend(&call->iovq, call->currentPacket); #ifdef RXDEBUG_PACKET call->iovqc++; #endif /* RXDEBUG_PACKET */ - call->currentPacket = (struct rx_packet *)0; + call->currentPacket = NULL; } #ifdef RXDEBUG_PACKET call->iovqc -= @@ -1108,23 +1102,23 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) tmpqc = 0; #endif /* RXDEBUG_PACKET */ do { - if (call->nFree == 0 && cp) { + if (call->nFree == 0 && call->currentPacket) { clock_NewTime(); /* Bogus: need new time package */ /* The 0, below, specifies that it is not the last packet: * there will be others. PrepareSendPacket may * alter the packet length by up to * conn->securityMaxTrailerSize */ - hadd32(call->bytesSent, cp->length); - rxi_PrepareSendPacket(call, cp, 0); + hadd32(call->bytesSent, call->currentPacket->length); + rxi_PrepareSendPacket(call, call->currentPacket, 0); #ifdef AFS_GLOBAL_RXLOCK_KERNEL /* PrepareSendPacket drops the call lock */ rxi_WaitforTQBusy(call); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ - queue_Append(&tmpq, cp); + queue_Append(&tmpq, call->currentPacket); #ifdef RXDEBUG_PACKET tmpqc++; #endif /* RXDEBUG_PACKET */ - cp = call->currentPacket = (struct rx_packet *)0; + call->currentPacket = NULL; /* The head of the iovq is now the current packet */ if (nbytes) { @@ -1137,25 +1131,23 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) rxi_FreePackets(0, &tmpq); return 0; } - cp = queue_First(&call->iovq, rx_packet); - queue_Remove(cp); + call->currentPacket = queue_First(&call->iovq, rx_packet); + queue_Remove(call->currentPacket); #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_IOVQ; + call->currentPacket->flags &= ~RX_PKTFLAG_IOVQ; + call->currentPacket->flags |= RX_PKTFLAG_CP; #endif #ifdef RXDEBUG_PACKET call->iovqc--; #endif /* RXDEBUG_PACKET */ -#ifdef RX_TRACK_PACKETS - cp->flags |= RX_PKTFLAG_CP; -#endif - call->currentPacket = cp; - call->nFree = cp->length; + call->nFree = call->currentPacket->length; call->curvec = 1; call->curpos = - (char *)cp->wirevec[1].iov_base + + (char *) call->currentPacket->wirevec[1].iov_base + call->conn->securityHeaderSize; call->curlen = - cp->wirevec[1].iov_len - call->conn->securityHeaderSize; + call->currentPacket->wirevec[1].iov_len - + call->conn->securityHeaderSize; } } @@ -1165,15 +1157,15 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) || iov[nextio].iov_len > (int)call->curlen) { call->error = RX_PROTOCOL_ERROR; MUTEX_EXIT(&call->lock); - if (cp) { + if (call->currentPacket) { #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - queue_Prepend(&tmpq, cp); + queue_Prepend(&tmpq, call->currentPacket); #ifdef RXDEBUG_PACKET tmpqc++; #endif /* RXDEBUG_PACKET */ - cp = call->currentPacket = (struct rx_packet *)0; + call->currentPacket = NULL; } #ifdef RXDEBUG_PACKET tmpqc -= @@ -1187,11 +1179,13 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) call->nFree -= iov[nextio].iov_len; nextio++; if (call->curlen == 0) { - if (++call->curvec > cp->niovecs) { + if (++call->curvec > call->currentPacket->niovecs) { call->nFree = 0; } else { - call->curpos = (char *)cp->wirevec[call->curvec].iov_base; - call->curlen = cp->wirevec[call->curvec].iov_len; + call->curpos = + call->currentPacket->wirevec[call->curvec].iov_base; + call->curlen = + call->currentPacket->wirevec[call->curvec].iov_len; } } } @@ -1232,18 +1226,15 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) call->startWait = 0; } - /* cp is no longer valid since we may have given up the lock */ - cp = call->currentPacket; - if (call->error) { call->mode = RX_MODE_ERROR; call->currentPacket = NULL; MUTEX_EXIT(&call->lock); - if (cp) { + if (call->currentPacket) { #ifdef RX_TRACK_PACKETS - cp->flags &= ~RX_PKTFLAG_CP; + call->currentPacket->flags &= ~RX_PKTFLAG_CP; #endif - rxi_FreePacket(cp); + rxi_FreePacket(call->currentPacket); } return 0; } -- 1.9.4