From: Simon Wilkinson Date: Sun, 8 Jan 2012 11:45:57 +0000 (+0000) Subject: rx: Rework code which pulls packet from recv queue X-Git-Tag: openafs-stable-1_8_0pre1~2532 X-Git-Url: http://git.openafs.org/?p=openafs.git;a=commitdiff_plain;h=a187be182ce2915b68af0e318bef352d41f7715a rx: Rework code which pulls packet from recv queue Both rxi_ReadProc and rxi_FillReadVector contained copies of the same code to pull a packet out of the receive queue, and turn it into the call's currentPacket. Abstract this out into a single common function, so we're not maintaining the same code in two different places. Change-Id: I20af6b4ff19f05e21ffde1a80609be12ad6cfeee Reviewed-on: http://gerrit.openafs.org/7195 Reviewed-by: Jeffrey Altman Tested-by: BuildBot Reviewed-by: Derrick Brashear --- diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index a711572..b95bb6f 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -69,6 +69,91 @@ /* rxdb_fileID is used to identify the lock location, along with line#. */ static int rxdb_fileID = RXDB_FILE_RX_RDWR; #endif /* RX_LOCKS_DB */ + +/* Get the next packet in the receive queue + * + * Dispose of the call's currentPacket, and move the next packet in the + * receive queue into the currentPacket field. If the next packet isn't + * available, then currentPacket is left NULL. + * + * @param call + * The RX call to manipulate + * @returns + * 0 on success, an error code on failure + * + * @notes + * Must be called with the call locked. Unlocks the call if returning + * with an error. + */ + +static int +rxi_GetNextPacket(struct rx_call *call) { + struct rx_packet *rp; + int error; + + if (call->currentPacket != NULL) { +#ifdef RX_TRACK_PACKETS + call->currentPacket->flags |= RX_PKTFLAG_CP; +#endif + rxi_FreePacket(call->currentPacket); + call->currentPacket = NULL; + } + + if (queue_IsEmpty(&call->rq)) + return 0; + + /* Check that next packet available is next in sequence */ + rp = queue_First(&call->rq, rx_packet); + if (rp->header.seq != call->rnext) + return 0; + + queue_Remove(rp); +#ifdef RX_TRACK_PACKETS + rp->flags &= ~RX_PKTFLAG_RQ; +#endif +#ifdef RXDEBUG_PACKET + call->rqc--; +#endif /* RXDEBUG_PACKET */ + + /* RXS_CheckPacket called to undo RXS_PreparePacket's work. It may + * reduce the length of the packet by up to conn->maxTrailerSize, + * to reflect the length of the data + the header. */ + if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) { + /* Used to merely shut down the call, but now we shut down the whole + * connection since this may indicate an attempt to hijack it */ + + MUTEX_EXIT(&call->lock); + rxi_ConnectionError(call->conn, error); + MUTEX_ENTER(&call->conn->conn_data_lock); + rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0); + MUTEX_EXIT(&call->conn->conn_data_lock); + rxi_FreePacket(rp); + + return error; + } + + call->rnext++; + call->currentPacket = rp; +#ifdef RX_TRACK_PACKETS + call->currentPacket->flags |= RX_PKTFLAG_CP; +#endif + call->curvec = 1; /* 0th vec is always header */ + + /* begin at the beginning [ more or less ], continue on until the end, + * then stop. */ + call->curpos = (char *)call->currentPacket->wirevec[1].iov_base + + call->conn->securityHeaderSize; + call->curlen = call->currentPacket->wirevec[1].iov_len - + call->conn->securityHeaderSize; + + call->nLeft = call->currentPacket->length; + hadd32(call->bytesRcvd, call->currentPacket->length); + + call->nHardAcks++; + + return 0; +} + /* rxi_ReadProc -- internal version. * * LOCKS USED -- called at netpri @@ -77,8 +162,8 @@ int rxi_ReadProc(struct rx_call *call, char *buf, int nbytes) { - struct rx_packet *rp; int requestCount; + int code; unsigned int t; /* XXXX took out clock_NewTime from here. Was it needed? */ @@ -110,83 +195,23 @@ rxi_ReadProc(struct rx_call *call, char *buf, continue; } } - if (queue_IsNotEmpty(&call->rq)) { - /* Check that next packet available is next in sequence */ - rp = queue_First(&call->rq, rx_packet); - if (rp->header.seq == call->rnext) { - afs_int32 error; - struct rx_connection *conn = call->conn; - queue_Remove(rp); -#ifdef RX_TRACK_PACKETS - rp->flags &= ~RX_PKTFLAG_RQ; -#endif -#ifdef RXDEBUG_PACKET - call->rqc--; -#endif /* RXDEBUG_PACKET */ - /* RXS_CheckPacket called to undo RXS_PreparePacket's - * work. It may reduce the length of the packet by up - * to conn->maxTrailerSize, to reflect the length of the - * data + the header. */ - if ((error = - RXS_CheckPacket(conn->securityObject, call, - rp))) { - /* Used to merely shut down the call, but now we - * shut down the whole connection since this may - * indicate an attempt to hijack it */ - - MUTEX_EXIT(&call->lock); - rxi_ConnectionError(conn, error); - MUTEX_ENTER(&conn->conn_data_lock); - rp = rxi_SendConnectionAbort(conn, rp, 0, 0); - MUTEX_EXIT(&conn->conn_data_lock); - rxi_FreePacket(rp); - - return 0; - } - call->rnext++; - call->currentPacket = rp; -#ifdef RX_TRACK_PACKETS - call->currentPacket->flags |= RX_PKTFLAG_CP; -#endif - call->curvec = 1; /* 0th vec is always header */ - /* begin at the beginning [ more or less ], continue - * on until the end, then stop. */ - call->curpos = - (char *) call->currentPacket->wirevec[1].iov_base + - call->conn->securityHeaderSize; - call->curlen = - call->currentPacket->wirevec[1].iov_len - - call->conn->securityHeaderSize; - - /* Notice that this code works correctly if the data - * size is 0 (which it may be--no reply arguments from - * server, for example). This relies heavily on the - * fact that the code below immediately frees the packet - * (no yields, etc.). If it didn't, this would be a - * problem because a value of zero for call->nLeft - * normally means that there is no read packet */ - call->nLeft = call->currentPacket->length; - hadd32(call->bytesRcvd, call->currentPacket->length); - - /* Send a hard ack for every rxi_HardAckRate+1 packets - * consumed. Otherwise schedule an event to send - * the hard ack later on. - */ - call->nHardAcks++; - if (!(call->flags & RX_CALL_RECEIVE_DONE)) { - if (call->nHardAcks > (u_short) rxi_HardAckRate) { - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); - rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0); - } else { - /* Delay to consolidate ack packets */ - rxi_PostDelayedAckEvent(call, - &rx_hardAckDelay); - } + code = rxi_GetNextPacket(call); + if (code) + return 0; + + if (call->currentPacket) { + if (!(call->flags & RX_CALL_RECEIVE_DONE)) { + if (call->nHardAcks > (u_short) rxi_HardAckRate) { + rxevent_Cancel(&call->delayedAckEvent, call, + RX_CALL_REFCOUNT_DELAY); + rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0); + } else { + /* Delay to consolidate ack packets */ + rxi_PostDelayedAckEvent(call, &rx_hardAckDelay); } - break; } + break; } /* @@ -371,8 +396,8 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) { int didConsume = 0; int didHardAck = 0; + int code; unsigned int t; - struct rx_packet *rp; struct iovec *call_iov; struct iovec *cur_iov = NULL; @@ -384,76 +409,19 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) { if (call->nLeft == 0) { /* Get next packet */ - if (queue_IsNotEmpty(&call->rq)) { - /* Check that next packet available is next in sequence */ - rp = queue_First(&call->rq, rx_packet); - if (rp->header.seq == call->rnext) { - afs_int32 error; - struct rx_connection *conn = call->conn; - queue_Remove(rp); -#ifdef RX_TRACK_PACKETS - rp->flags &= ~RX_PKTFLAG_RQ; -#endif -#ifdef RXDEBUG_PACKET - call->rqc--; -#endif /* RXDEBUG_PACKET */ - - /* RXS_CheckPacket called to undo RXS_PreparePacket's - * work. It may reduce the length of the packet by up - * to conn->maxTrailerSize, to reflect the length of the - * data + the header. */ - if ((error = - RXS_CheckPacket(conn->securityObject, call, rp))) { - /* Used to merely shut down the call, but now we - * shut down the whole connection since this may - * indicate an attempt to hijack it */ + code = rxi_GetNextPacket(call); + if (code) { + MUTEX_ENTER(&call->lock); + return 1; + } - MUTEX_EXIT(&call->lock); - rxi_ConnectionError(conn, error); - MUTEX_ENTER(&conn->conn_data_lock); - rp = rxi_SendConnectionAbort(conn, rp, 0, 0); - MUTEX_EXIT(&conn->conn_data_lock); - rxi_FreePacket(rp); - MUTEX_ENTER(&call->lock); - - return 1; - } - call->rnext++; - call->currentPacket = rp; -#ifdef RX_TRACK_PACKETS - call->currentPacket->flags |= RX_PKTFLAG_CP; -#endif - call->curvec = 1; /* 0th vec is always header */ - cur_iov = &call->currentPacket->wirevec[1]; - /* begin at the beginning [ more or less ], continue - * on until the end, then stop. */ - call->curpos = - (char *)call->currentPacket->wirevec[1].iov_base + - call->conn->securityHeaderSize; - call->curlen = - call->currentPacket->wirevec[1].iov_len - - call->conn->securityHeaderSize; - - /* Notice that this code works correctly if the data - * size is 0 (which it may be--no reply arguments from - * server, for example). This relies heavily on the - * fact that the code below immediately frees the packet - * (no yields, etc.). If it didn't, this would be a - * problem because a value of zero for call->nLeft - * normally means that there is no read packet */ - call->nLeft = call->currentPacket->length; - hadd32(call->bytesRcvd, call->currentPacket->length); - - /* Send a hard ack for every rxi_HardAckRate+1 packets - * consumed. Otherwise schedule an event to send - * the hard ack later on. - */ - call->nHardAcks++; - didConsume = 1; - continue; - } + if (call->currentPacket) { + cur_iov = &call->currentPacket->wirevec[1]; + didConsume = 1; + continue; + } else { + break; } - break; } /* It's possible for call->nLeft to be smaller than any particular