Rx: Consolidate wait for tq busy and make its use uniform
authorJeffrey Altman <jaltman@your-file-system.com>
Tue, 12 Oct 2010 14:53:43 +0000 (10:53 -0400)
committerJeffrey Altman <jaltman@openafs.org>
Thu, 14 Oct 2010 03:02:58 +0000 (20:02 -0700)
rxi_WaitforTQBusy() is now used wherever a wait for the transmit
queue is required.  It returns either when the transmit queue is
no longer busy or when the call enters an error state.

Having made this change it is clear that call->currentPacket is
not always validated when the call->lock is reacquired which may be
true when rxi_WaitforTQBusy() is called.

Change-Id: Ibf297f1447755be2abd39a81063cc7efd7f7a08b
Reviewed-on: http://gerrit.openafs.org/2966
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>
Tested-by: Jeffrey Altman <jaltman@openafs.org>

src/rx/rx.c
src/rx/rx_prototypes.h
src/rx/rx_rdwr.c

index 9696e3d..082c2b8 100644 (file)
@@ -1187,8 +1187,8 @@ rx_GetConnection(struct rx_connection *conn)
 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
 /* Wait for the transmit queue to no longer be busy.
  * requires the call->lock to be held */
-static void rxi_WaitforTQBusy(struct rx_call *call) {
-    while (call->flags & RX_CALL_TQ_BUSY) {
+void rxi_WaitforTQBusy(struct rx_call *call) {
+    while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
        call->flags |= RX_CALL_TQ_WAIT;
        call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
@@ -3023,20 +3023,22 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
             * flag is cleared.
             */
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-           while ((call->state == RX_STATE_ACTIVE)
-                  && (call->flags & RX_CALL_TQ_BUSY)) {
-               call->flags |= RX_CALL_TQ_WAIT;
-               call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-               osirx_AssertMine(&call->lock, "rxi_Start lock3");
-               CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-               osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-               call->tqWaiters--;
-               if (call->tqWaiters == 0)
-                   call->flags &= ~RX_CALL_TQ_WAIT;
-           }
+            if (call->state == RX_STATE_ACTIVE) {
+                rxi_WaitforTQBusy(call);
+                /*
+                 * If we entered error state while waiting,
+                 * must call rxi_CallError to permit rxi_ResetCall
+                 * to processed when the tqWaiter count hits zero.
+                 */
+                if (call->error) {
+                    rxi_CallError(call, call->error);
+                    MUTEX_EXIT(&call->lock);
+                    MUTEX_ENTER(&rx_refcnt_mutex);
+                    conn->refCount--;
+                    MUTEX_EXIT(&rx_refcnt_mutex);
+                    return np;
+                }
+            }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
            /* If the new call cannot be taken right now send a busy and set
             * the error condition in this call, so that it terminates as
@@ -5636,34 +5638,42 @@ rxi_Start(struct rxevent *event,
        rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
-       call->flags |= RX_CALL_FAST_RECOVER;
-       if (peer->maxDgramPackets > 1) {
-           call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
-       } else {
-           call->MTU = MIN(peer->natMTU, peer->maxMTU);
-       }
-       call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
-       call->nDgramPackets = 1;
-       call->cwind = 1;
-       call->nextCwind = 1;
-       call->nAcks = 0;
-       call->nNacks = 0;
-       MUTEX_ENTER(&peer->peer_lock);
-       peer->MTU = call->MTU;
-       peer->cwind = call->cwind;
-       peer->nDgramPackets = 1;
-       peer->congestSeq++;
-       call->congestSeq = peer->congestSeq;
-       MUTEX_EXIT(&peer->peer_lock);
-       /* Clear retry times on packets. Otherwise, it's possible for
-        * some packets in the queue to force resends at rates faster
-        * than recovery rates.
-        */
-       for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
-           if (!(p->flags & RX_PKTFLAG_ACKED)) {
-               clock_Zero(&p->retryTime);
-           }
-       }
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+        if (call->error) {
+            if (rx_stats_active)
+                rx_atomic_inc(&rx_tq_debug.rxi_start_in_error);
+            return;
+        }
+#endif
+        call->flags |= RX_CALL_FAST_RECOVER;
+
+        if (peer->maxDgramPackets > 1) {
+            call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
+        } else {
+            call->MTU = MIN(peer->natMTU, peer->maxMTU);
+        }
+        call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
+        call->nDgramPackets = 1;
+        call->cwind = 1;
+        call->nextCwind = 1;
+        call->nAcks = 0;
+        call->nNacks = 0;
+        MUTEX_ENTER(&peer->peer_lock);
+        peer->MTU = call->MTU;
+        peer->cwind = call->cwind;
+        peer->nDgramPackets = 1;
+        peer->congestSeq++;
+        call->congestSeq = peer->congestSeq;
+        MUTEX_EXIT(&peer->peer_lock);
+        /* Clear retry times on packets. Otherwise, it's possible for
+         * some packets in the queue to force resends at rates faster
+         * than recovery rates.
+         */
+        for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+            if (!(p->flags & RX_PKTFLAG_ACKED)) {
+                clock_Zero(&p->retryTime);
+            }
+        }
     }
     if (call->error) {
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
index aee4478..4dba7c6 100644 (file)
@@ -563,7 +563,9 @@ extern void rxi_PrepareSendPacket(struct rx_call *call,
 extern int rxi_AdjustIfMTU(int mtu);
 extern int rxi_AdjustMaxMTU(int mtu, int peerMaxMTU);
 extern int rxi_AdjustDgramPackets(int frags, int mtu);
-
+#ifdef  AFS_GLOBAL_RXLOCK_KERNEL
+extern void rxi_WaitforTQBusy(struct rx_call *call);
+#endif
 
 /* rxperf.c */
 
@@ -608,6 +610,8 @@ extern int rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio,
 extern void rxi_FlushWrite(struct rx_call *call);
 extern void rx_FlushWrite(struct rx_call *call);
 
+
+
 /* rx_stats.c */
 extern struct rx_statistics * rx_GetStatistics(void);
 extern void rx_FreeStatistics(struct rx_statistics **);
index eb0ba9d..1752a8d 100644 (file)
@@ -730,6 +730,10 @@ rxi_WriteProc(struct rx_call *call, char *buf,
     do {
        if (call->nFree == 0) {
            MUTEX_ENTER(&call->lock);
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+            rxi_WaitforTQBusy(call);
+#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+            cp = call->currentPacket;
             if (call->error)
                 call->mode = RX_MODE_ERROR;
            if (!call->error && cp) {
@@ -742,23 +746,6 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                 cp->flags &= ~RX_PKTFLAG_CP;
 #endif
                call->currentPacket = (struct rx_packet *)0;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-               /* Wait until TQ_BUSY is reset before adding any
-                * packets to the transmit queue
-                */
-               while (call->flags & RX_CALL_TQ_BUSY) {
-                   call->flags |= RX_CALL_TQ_WAIT;
-                    call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-                   CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-                   osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-                    call->tqWaiters--;
-                    if (call->tqWaiters == 0)
-                        call->flags &= ~RX_CALL_TQ_WAIT;
-               }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
                clock_NewTime();        /* Bogus:  need new time package */
                /* The 0, below, specifies that it is not the last packet:
                 * there will be others. PrepareSendPacket may
@@ -1148,20 +1135,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
        call->error = RX_PROTOCOL_ERROR;
     }
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-    /* Wait until TQ_BUSY is reset before trying to move any
-     * packets to the transmit queue.  */
-    while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
-       call->flags |= RX_CALL_TQ_WAIT;
-        call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-       CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-       osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-        call->tqWaiters--;
-        if (call->tqWaiters == 0)
-            call->flags &= ~RX_CALL_TQ_WAIT;
-    }
+    rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
     cp = call->currentPacket;
 
@@ -1177,6 +1151,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 #ifdef RXDEBUG_PACKET
             call->iovqc++;
 #endif /* RXDEBUG_PACKET */
+           call->currentPacket = (struct rx_packet *)0;
        }
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -1384,23 +1359,8 @@ rxi_FlushWrite(struct rx_call *call)
 
         MUTEX_ENTER(&call->lock);
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-       /* Wait until TQ_BUSY is reset before adding any
-        * packets to the transmit queue
-        */
-       while (call->flags & RX_CALL_TQ_BUSY) {
-           call->flags |= RX_CALL_TQ_WAIT;
-            call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-           CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-           osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-            call->tqWaiters--;
-            if (call->tqWaiters == 0)
-                call->flags &= ~RX_CALL_TQ_WAIT;
-       }
+        rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-
         if (call->error)
             call->mode = RX_MODE_ERROR;