From e45abc6cc20236b9e91c23cb6f8e90f51b6a4a99 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Tue, 12 Oct 2010 10:53:43 -0400 Subject: [PATCH] Rx: Consolidate wait for tq busy and make its use uniform rxi_WaitforTQBusy() is now used wherever a wait for the transmit queue is required. It returns either when the transmit queue is no longer busy or when the call enters an error state. Having made this change it is clear that call->currentPacket is not always validated when the call->lock is reacquired which may be true when rxi_WaitforTQBusy() is called. Change-Id: Ibf297f1447755be2abd39a81063cc7efd7f7a08b Reviewed-on: http://gerrit.openafs.org/2966 Reviewed-by: Jeffrey Altman Tested-by: Jeffrey Altman --- src/rx/rx.c | 98 +++++++++++++++++++++++++++----------------------- src/rx/rx_prototypes.h | 6 +++- src/rx/rx_rdwr.c | 54 ++++------------------------ 3 files changed, 66 insertions(+), 92 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 9696e3d..082c2b8 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -1187,8 +1187,8 @@ rx_GetConnection(struct rx_connection *conn) #ifdef AFS_GLOBAL_RXLOCK_KERNEL /* Wait for the transmit queue to no longer be busy. * requires the call->lock to be held */ -static void rxi_WaitforTQBusy(struct rx_call *call) { - while (call->flags & RX_CALL_TQ_BUSY) { +void rxi_WaitforTQBusy(struct rx_call *call) { + while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) { call->flags |= RX_CALL_TQ_WAIT; call->tqWaiters++; #ifdef RX_ENABLE_LOCKS @@ -3023,20 +3023,22 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket, * flag is cleared. */ #ifdef AFS_GLOBAL_RXLOCK_KERNEL - while ((call->state == RX_STATE_ACTIVE) - && (call->flags & RX_CALL_TQ_BUSY)) { - call->flags |= RX_CALL_TQ_WAIT; - call->tqWaiters++; -#ifdef RX_ENABLE_LOCKS - osirx_AssertMine(&call->lock, "rxi_Start lock3"); - CV_WAIT(&call->cv_tq, &call->lock); -#else /* RX_ENABLE_LOCKS */ - osi_rxSleep(&call->tq); -#endif /* RX_ENABLE_LOCKS */ - call->tqWaiters--; - if (call->tqWaiters == 0) - call->flags &= ~RX_CALL_TQ_WAIT; - } + if (call->state == RX_STATE_ACTIVE) { + rxi_WaitforTQBusy(call); + /* + * If we entered error state while waiting, + * must call rxi_CallError to permit rxi_ResetCall + * to processed when the tqWaiter count hits zero. + */ + if (call->error) { + rxi_CallError(call, call->error); + MUTEX_EXIT(&call->lock); + MUTEX_ENTER(&rx_refcnt_mutex); + conn->refCount--; + MUTEX_EXIT(&rx_refcnt_mutex); + return np; + } + } #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ /* If the new call cannot be taken right now send a busy and set * the error condition in this call, so that it terminates as @@ -5636,34 +5638,42 @@ rxi_Start(struct rxevent *event, rxi_WaitforTQBusy(call); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ call->flags &= ~RX_CALL_FAST_RECOVER_WAIT; - call->flags |= RX_CALL_FAST_RECOVER; - if (peer->maxDgramPackets > 1) { - call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE; - } else { - call->MTU = MIN(peer->natMTU, peer->maxMTU); - } - call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1; - call->nDgramPackets = 1; - call->cwind = 1; - call->nextCwind = 1; - call->nAcks = 0; - call->nNacks = 0; - MUTEX_ENTER(&peer->peer_lock); - peer->MTU = call->MTU; - peer->cwind = call->cwind; - peer->nDgramPackets = 1; - peer->congestSeq++; - call->congestSeq = peer->congestSeq; - MUTEX_EXIT(&peer->peer_lock); - /* Clear retry times on packets. Otherwise, it's possible for - * some packets in the queue to force resends at rates faster - * than recovery rates. - */ - for (queue_Scan(&call->tq, p, nxp, rx_packet)) { - if (!(p->flags & RX_PKTFLAG_ACKED)) { - clock_Zero(&p->retryTime); - } - } +#ifdef AFS_GLOBAL_RXLOCK_KERNEL + if (call->error) { + if (rx_stats_active) + rx_atomic_inc(&rx_tq_debug.rxi_start_in_error); + return; + } +#endif + call->flags |= RX_CALL_FAST_RECOVER; + + if (peer->maxDgramPackets > 1) { + call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE; + } else { + call->MTU = MIN(peer->natMTU, peer->maxMTU); + } + call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1; + call->nDgramPackets = 1; + call->cwind = 1; + call->nextCwind = 1; + call->nAcks = 0; + call->nNacks = 0; + MUTEX_ENTER(&peer->peer_lock); + peer->MTU = call->MTU; + peer->cwind = call->cwind; + peer->nDgramPackets = 1; + peer->congestSeq++; + call->congestSeq = peer->congestSeq; + MUTEX_EXIT(&peer->peer_lock); + /* Clear retry times on packets. Otherwise, it's possible for + * some packets in the queue to force resends at rates faster + * than recovery rates. + */ + for (queue_Scan(&call->tq, p, nxp, rx_packet)) { + if (!(p->flags & RX_PKTFLAG_ACKED)) { + clock_Zero(&p->retryTime); + } + } } if (call->error) { #ifdef AFS_GLOBAL_RXLOCK_KERNEL diff --git a/src/rx/rx_prototypes.h b/src/rx/rx_prototypes.h index aee4478..4dba7c6 100644 --- a/src/rx/rx_prototypes.h +++ b/src/rx/rx_prototypes.h @@ -563,7 +563,9 @@ extern void rxi_PrepareSendPacket(struct rx_call *call, extern int rxi_AdjustIfMTU(int mtu); extern int rxi_AdjustMaxMTU(int mtu, int peerMaxMTU); extern int rxi_AdjustDgramPackets(int frags, int mtu); - +#ifdef AFS_GLOBAL_RXLOCK_KERNEL +extern void rxi_WaitforTQBusy(struct rx_call *call); +#endif /* rxperf.c */ @@ -608,6 +610,8 @@ extern int rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, extern void rxi_FlushWrite(struct rx_call *call); extern void rx_FlushWrite(struct rx_call *call); + + /* rx_stats.c */ extern struct rx_statistics * rx_GetStatistics(void); extern void rx_FreeStatistics(struct rx_statistics **); diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index eb0ba9d..1752a8d 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -730,6 +730,10 @@ rxi_WriteProc(struct rx_call *call, char *buf, do { if (call->nFree == 0) { MUTEX_ENTER(&call->lock); +#ifdef AFS_GLOBAL_RXLOCK_KERNEL + rxi_WaitforTQBusy(call); +#endif /* AFS_GLOBAL_RXLOCK_KERNEL */ + cp = call->currentPacket; if (call->error) call->mode = RX_MODE_ERROR; if (!call->error && cp) { @@ -742,23 +746,6 @@ rxi_WriteProc(struct rx_call *call, char *buf, cp->flags &= ~RX_PKTFLAG_CP; #endif call->currentPacket = (struct rx_packet *)0; -#ifdef AFS_GLOBAL_RXLOCK_KERNEL - /* Wait until TQ_BUSY is reset before adding any - * packets to the transmit queue - */ - while (call->flags & RX_CALL_TQ_BUSY) { - call->flags |= RX_CALL_TQ_WAIT; - call->tqWaiters++; -#ifdef RX_ENABLE_LOCKS - CV_WAIT(&call->cv_tq, &call->lock); -#else /* RX_ENABLE_LOCKS */ - osi_rxSleep(&call->tq); -#endif /* RX_ENABLE_LOCKS */ - call->tqWaiters--; - if (call->tqWaiters == 0) - call->flags &= ~RX_CALL_TQ_WAIT; - } -#endif /* AFS_GLOBAL_RXLOCK_KERNEL */ clock_NewTime(); /* Bogus: need new time package */ /* The 0, below, specifies that it is not the last packet: * there will be others. PrepareSendPacket may @@ -1148,20 +1135,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) call->error = RX_PROTOCOL_ERROR; } #ifdef AFS_GLOBAL_RXLOCK_KERNEL - /* Wait until TQ_BUSY is reset before trying to move any - * packets to the transmit queue. */ - while (!call->error && call->flags & RX_CALL_TQ_BUSY) { - call->flags |= RX_CALL_TQ_WAIT; - call->tqWaiters++; -#ifdef RX_ENABLE_LOCKS - CV_WAIT(&call->cv_tq, &call->lock); -#else /* RX_ENABLE_LOCKS */ - osi_rxSleep(&call->tq); -#endif /* RX_ENABLE_LOCKS */ - call->tqWaiters--; - if (call->tqWaiters == 0) - call->flags &= ~RX_CALL_TQ_WAIT; - } + rxi_WaitforTQBusy(call); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ cp = call->currentPacket; @@ -1177,6 +1151,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) #ifdef RXDEBUG_PACKET call->iovqc++; #endif /* RXDEBUG_PACKET */ + call->currentPacket = (struct rx_packet *)0; } #ifdef RXDEBUG_PACKET call->iovqc -= @@ -1384,23 +1359,8 @@ rxi_FlushWrite(struct rx_call *call) MUTEX_ENTER(&call->lock); #ifdef AFS_GLOBAL_RXLOCK_KERNEL - /* Wait until TQ_BUSY is reset before adding any - * packets to the transmit queue - */ - while (call->flags & RX_CALL_TQ_BUSY) { - call->flags |= RX_CALL_TQ_WAIT; - call->tqWaiters++; -#ifdef RX_ENABLE_LOCKS - CV_WAIT(&call->cv_tq, &call->lock); -#else /* RX_ENABLE_LOCKS */ - osi_rxSleep(&call->tq); -#endif /* RX_ENABLE_LOCKS */ - call->tqWaiters--; - if (call->tqWaiters == 0) - call->flags &= ~RX_CALL_TQ_WAIT; - } + rxi_WaitforTQBusy(call); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ - if (call->error) call->mode = RX_MODE_ERROR; -- 1.9.4