rx: Move transmit queue clearing
[openafs.git] / src / rx / rx.c
index c4c9814..bb3a852 100644 (file)
@@ -152,17 +152,14 @@ static void rxi_KeepAliveOn(struct rx_call *call);
 static void rxi_GrowMTUOn(struct rx_call *call);
 static void rxi_ChallengeOn(struct rx_connection *conn);
 static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
+static void rxi_AckAllInTransmitQueue(struct rx_call *call);
 
 #ifdef RX_ENABLE_LOCKS
-static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
-#endif
-
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
 struct rx_tq_debug {
     rx_atomic_t rxi_start_aborted; /* rxi_start awoke after rxi_Send in error.*/
     rx_atomic_t rxi_start_in_error;
 } rx_tq_debug;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
 
 /* Constant delay time before sending an acknowledge of the last packet
  * received.  This is to avoid sending an extra acknowledge when the
@@ -1409,7 +1406,7 @@ rx_GetConnection(struct rx_connection *conn)
     USERPRI;
 }
 
-#ifdef  AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
 /* Wait for the transmit queue to no longer be busy.
  * requires the call->lock to be held */
 void
@@ -1417,12 +1414,8 @@ rxi_WaitforTQBusy(struct rx_call *call) {
     while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
        call->flags |= RX_CALL_TQ_WAIT;
        call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
        osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
        CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-       osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
        call->tqWaiters--;
        if (call->tqWaiters == 0) {
            call->flags &= ~RX_CALL_TQ_WAIT;
@@ -1668,18 +1661,15 @@ rx_NewCall(struct rx_connection *conn)
      * run (see code above that avoids resource starvation).
      */
 #ifdef RX_ENABLE_LOCKS
+    if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
+        osi_Panic("rx_NewCall call about to be used without an empty tq");
+    }
+
     CV_BROADCAST(&conn->conn_call_cv);
 #else
     osi_rxWakeup(conn);
 #endif
     MUTEX_EXIT(&conn->conn_call_lock);
-
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-    if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
-        osi_Panic("rx_NewCall call about to be used without an empty tq");
-    }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-
     MUTEX_EXIT(&call->lock);
     USERPRI;
 
@@ -1790,9 +1780,7 @@ rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId,
     tservice = rxi_AllocService();
     NETPRI;
 
-#ifdef RX_ENABLE_LOCKS
     MUTEX_INIT(&tservice->svc_data_lock, "svc data lock", MUTEX_DEFAULT, 0);
-#endif
 
     for (i = 0; i < RX_MAX_SERVICES; i++) {
        struct rx_service *service = rx_services[i];
@@ -2650,10 +2638,10 @@ static struct rx_call *
 rxi_NewCall(struct rx_connection *conn, int channel)
 {
     struct rx_call *call;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
     struct rx_call *cp;        /* Call pointer temp */
     struct opr_queue *cursor;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif
 
     dpf(("rxi_NewCall(conn %"AFS_PTR_FMT", channel %d)\n", conn, channel));
 
@@ -2662,7 +2650,7 @@ rxi_NewCall(struct rx_connection *conn, int channel)
      * rxi_FreeCall */
     MUTEX_ENTER(&rx_freeCallQueue_lock);
 
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
     /*
      * EXCEPT that the TQ might not yet be cleared out.
      * Skip over those with in-use TQs.
@@ -2676,24 +2664,24 @@ rxi_NewCall(struct rx_connection *conn, int channel)
        }
     }
     if (call) {
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
+#else /* RX_ENABLE_LOCKS */
     if (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
        call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
        opr_queue_Remove(&call->entry);
         if (rx_stats_active)
            rx_atomic_dec(&rx_stats.nFreeCallStructs);
        MUTEX_EXIT(&rx_freeCallQueue_lock);
        MUTEX_ENTER(&call->lock);
        CLEAR_CALL_QUEUE_LOCK(call);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
        /* Now, if TQ wasn't cleared earlier, do it now. */
        rxi_WaitforTQBusy(call);
        if (call->flags & RX_CALL_TQ_CLEARME) {
            rxi_ClearTransmitQueue(call, 1);
            /*queue_Init(&call->tq);*/
        }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
        /* Bind the call to its connection structure */
        call->conn = conn;
        rxi_ResetCall(call, 1);
@@ -2792,7 +2780,7 @@ rxi_FreeCall(struct rx_call *call, int haveCTLock)
 
     MUTEX_ENTER(&rx_freeCallQueue_lock);
     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
     /* A call may be free even though its transmit queue is still in use.
      * Since we search the call list from head to tail, put busy calls at
      * the head of the list, and idle calls at the tail.
@@ -2801,9 +2789,9 @@ rxi_FreeCall(struct rx_call *call, int haveCTLock)
        opr_queue_Prepend(&rx_freeCallQueue, &call->entry);
     else
        opr_queue_Append(&rx_freeCallQueue, &call->entry);
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
+#else /* RX_ENABLE_LOCKS */
     opr_queue_Append(&rx_freeCallQueue, &call->entry);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
     if (rx_stats_active)
        rx_atomic_inc(&rx_stats.nFreeCallStructs);
     MUTEX_EXIT(&rx_freeCallQueue_lock);
@@ -3219,6 +3207,154 @@ rxi_CheckBusy(struct rx_call *call)
     MUTEX_EXIT(&conn->conn_call_lock);
 }
 
+/*!
+ * Abort the call if the server is over the busy threshold. This
+ * can be used without requiring a call structure be initialised,
+ * or connected to a particular channel
+ */
+static_inline int
+rxi_AbortIfServerBusy(osi_socket socket, struct rx_connection *conn,
+                     struct rx_packet *np)
+{
+    if ((rx_BusyThreshold > 0) &&
+       (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
+       rxi_SendRawAbort(socket, conn->peer->host, conn->peer->port,
+                        rx_BusyError, np, 0);
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.nBusies);
+       return 1;
+    }
+
+    return 0;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveClientCall(struct rx_packet *np, struct rx_connection *conn)
+{
+    int channel;
+    struct rx_call *call;
+
+    channel = np->header.cid & RX_CHANNELMASK;
+    MUTEX_ENTER(&conn->conn_call_lock);
+    call = conn->call[channel];
+    if (!call || conn->callNumber[channel] != np->header.callNumber) {
+       MUTEX_EXIT(&conn->conn_call_lock);
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+       return NULL;
+    }
+
+    MUTEX_ENTER(&call->lock);
+    MUTEX_EXIT(&conn->conn_call_lock);
+
+    if ((call->state == RX_STATE_DALLY)
+       && np->header.type == RX_PACKET_TYPE_ACK) {
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.ignorePacketDally);
+        MUTEX_EXIT(&call->lock);
+       return NULL;
+    }
+
+    return call;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveServerCall(osi_socket socket, struct rx_packet *np,
+                     struct rx_connection *conn)
+{
+    int channel;
+    struct rx_call *call;
+
+    channel = np->header.cid & RX_CHANNELMASK;
+    MUTEX_ENTER(&conn->conn_call_lock);
+    call = conn->call[channel];
+
+    if (!call) {
+       if (rxi_AbortIfServerBusy(socket, conn, np)) {
+           MUTEX_EXIT(&conn->conn_call_lock);
+           return NULL;
+       }
+
+       call = rxi_NewCall(conn, channel);  /* returns locked call */
+       *call->callNumber = np->header.callNumber;
+       MUTEX_EXIT(&conn->conn_call_lock);
+
+       call->state = RX_STATE_PRECALL;
+       clock_GetTime(&call->queueTime);
+       call->app.bytesSent = 0;
+       call->app.bytesRcvd = 0;
+       rxi_KeepAliveOn(call);
+
+       return call;
+    }
+
+    if (np->header.callNumber == conn->callNumber[channel]) {
+       MUTEX_ENTER(&call->lock);
+       MUTEX_EXIT(&conn->conn_call_lock);
+       return call;
+    }
+
+    if (np->header.callNumber < conn->callNumber[channel]) {
+       MUTEX_EXIT(&conn->conn_call_lock);
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+       return NULL;
+    }
+
+    MUTEX_ENTER(&call->lock);
+    MUTEX_EXIT(&conn->conn_call_lock);
+
+    /* Wait until the transmit queue is idle before deciding
+     * whether to reset the current call. Chances are that the
+     * call will be in ether DALLY or HOLD state once the TQ_BUSY
+     * flag is cleared.
+     */
+#ifdef RX_ENABLE_LOCKS
+    if (call->state == RX_STATE_ACTIVE) {
+       rxi_WaitforTQBusy(call);
+        /* If we entered error state while waiting,
+         * must call rxi_CallError to permit rxi_ResetCall
+         * to processed when the tqWaiter count hits zero.
+         */
+        if (call->error) {
+           rxi_CallError(call, call->error);
+           MUTEX_EXIT(&call->lock);
+            return NULL;
+        }
+    }
+#endif /* RX_ENABLE_LOCKS */
+    /* If the new call cannot be taken right now send a busy and set
+     * the error condition in this call, so that it terminates as
+     * quickly as possible */
+    if (call->state == RX_STATE_ACTIVE) {
+       rxi_CallError(call, RX_CALL_DEAD);
+       rxi_SendSpecial(call, conn, NULL, RX_PACKET_TYPE_BUSY,
+                       NULL, 0, 1);
+       MUTEX_EXIT(&call->lock);
+       return NULL;
+    }
+
+    if (rxi_AbortIfServerBusy(socket, conn, np)) {
+       MUTEX_EXIT(&call->lock);
+       return NULL;
+    }
+
+    rxi_ResetCall(call, 0);
+    /* The conn_call_lock is not held but no one else should be
+     * using this call channel while we are processing this incoming
+     * packet.  This assignment should be safe.
+     */
+    *call->callNumber = np->header.callNumber;
+    call->state = RX_STATE_PRECALL;
+    clock_GetTime(&call->queueTime);
+    call->app.bytesSent = 0;
+    call->app.bytesRcvd = 0;
+    rxi_KeepAliveOn(call);
+
+    return call;
+}
+
+
 /* There are two packet tracing routines available for testing and monitoring
  * Rx.  One is called just after every packet is received and the other is
  * called just before every packet is sent.  Received packets, have had their
@@ -3243,8 +3379,6 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
 {
     struct rx_call *call;
     struct rx_connection *conn;
-    int channel;
-    afs_uint32 currentCallNumber;
     int type;
 #ifdef RXDEBUG
     char *packetType;
@@ -3400,221 +3534,14 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
        }
     }
 
-    channel = np->header.cid & RX_CHANNELMASK;
-    MUTEX_ENTER(&conn->conn_call_lock);
-    call = conn->call[channel];
-
-    if (call) {
-       MUTEX_ENTER(&call->lock);
-        currentCallNumber = conn->callNumber[channel];
-        MUTEX_EXIT(&conn->conn_call_lock);
-    } else if (type == RX_SERVER_CONNECTION) {  /* No call allocated */
-       call = rxi_NewCall(conn, channel);  /* returns locked call */
-       *call->callNumber = currentCallNumber = np->header.callNumber;
-       MUTEX_EXIT(&conn->conn_call_lock);
-#ifdef RXDEBUG
-       if (np->header.callNumber == 0)
-           dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, "
-                "packet %"AFS_PTR_FMT" len %d\n",
-                np->header.serial, rx_packetTypes[np->header.type - 1], 
-                ntohl(conn->peer->host), ntohs(conn->peer->port),
-                np->header.serial, np->header.epoch, np->header.cid, 
-                np->header.callNumber, np->header.seq,
-                np->header.flags, np, np->length));
-#endif
-       call->state = RX_STATE_PRECALL;
-       clock_GetTime(&call->queueTime);
-       call->app.bytesSent = 0;
-       call->app.bytesRcvd = 0;
-       /*
-        * If the number of queued calls exceeds the overload
-        * threshold then abort this call.
-        */
-       if ((rx_BusyThreshold > 0) &&
-           (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
-           struct rx_packet *tp;
+    if (type == RX_SERVER_CONNECTION)
+       call = rxi_ReceiveServerCall(socket, np, conn);
+    else
+       call = rxi_ReceiveClientCall(np, conn);
 
-           rxi_CallError(call, rx_BusyError);
-           tp = rxi_SendCallAbort(call, np, 1, 0);
-           MUTEX_EXIT(&call->lock);
-           putConnection(conn);
-           if (rx_stats_active)
-               rx_atomic_inc(&rx_stats.nBusies);
-           return tp;
-        }
-       rxi_KeepAliveOn(call);
-    } else {    /* RX_CLIENT_CONNECTION and No call allocated */
-        /* This packet can't be for this call. If the new call address is
-         * 0 then no call is running on this channel. If there is a call
-         * then, since this is a client connection we're getting data for
-         * it must be for the previous call.
-         */
-        MUTEX_EXIT(&conn->conn_call_lock);
-        if (rx_stats_active)
-            rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+    if (call == NULL) {
        putConnection(conn);
-        return np;
-    }
-
-    /* There is a non-NULL locked call at this point */
-    if (type == RX_SERVER_CONNECTION) {        /* We're the server */
-        if (np->header.callNumber < currentCallNumber) {
-            MUTEX_EXIT(&call->lock);
-            if (rx_stats_active)
-                rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-           putConnection(conn);
-            return np;
-        } else if (np->header.callNumber != currentCallNumber) {
-           /* Wait until the transmit queue is idle before deciding
-            * whether to reset the current call. Chances are that the
-            * call will be in ether DALLY or HOLD state once the TQ_BUSY
-            * flag is cleared.
-            */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-            if (call->state == RX_STATE_ACTIVE) {
-                rxi_WaitforTQBusy(call);
-                /*
-                 * If we entered error state while waiting,
-                 * must call rxi_CallError to permit rxi_ResetCall
-                 * to processed when the tqWaiter count hits zero.
-                 */
-                if (call->error) {
-                    rxi_CallError(call, call->error);
-                    MUTEX_EXIT(&call->lock);
-                   putConnection(conn);
-                    return np;
-                }
-            }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-           /* If the new call cannot be taken right now send a busy and set
-            * the error condition in this call, so that it terminates as
-            * quickly as possible */
-           if (call->state == RX_STATE_ACTIVE) {
-               struct rx_packet *tp;
-
-               rxi_CallError(call, RX_CALL_DEAD);
-               tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
-                                    NULL, 0, 1);
-               MUTEX_EXIT(&call->lock);
-               putConnection(conn);
-               return tp;
-           }
-           rxi_ResetCall(call, 0);
-            /*
-             * The conn_call_lock is not held but no one else should be
-             * using this call channel while we are processing this incoming
-             * packet.  This assignment should be safe.
-             */
-           *call->callNumber = np->header.callNumber;
-#ifdef RXDEBUG
-           if (np->header.callNumber == 0)
-               dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
-                      np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
-                      np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
-                      np->header.flags, np, np->length));
-#endif
-           call->state = RX_STATE_PRECALL;
-           clock_GetTime(&call->queueTime);
-           call->app.bytesSent = 0;
-           call->app.bytesRcvd = 0;
-           /*
-            * If the number of queued calls exceeds the overload
-            * threshold then abort this call.
-            */
-           if ((rx_BusyThreshold > 0) &&
-               (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
-               struct rx_packet *tp;
-
-               rxi_CallError(call, rx_BusyError);
-               tp = rxi_SendCallAbort(call, np, 1, 0);
-               MUTEX_EXIT(&call->lock);
-               putConnection(conn);
-                if (rx_stats_active)
-                    rx_atomic_inc(&rx_stats.nBusies);
-               return tp;
-           }
-           rxi_KeepAliveOn(call);
-       } else {
-           /* Continuing call; do nothing here. */
-       }
-    } else {                   /* we're the client */
-       /* Ignore all incoming acknowledgements for calls in DALLY state */
-       if ((call->state == RX_STATE_DALLY)
-           && (np->header.type == RX_PACKET_TYPE_ACK)) {
-            if (rx_stats_active)
-                rx_atomic_inc(&rx_stats.ignorePacketDally);
-            MUTEX_EXIT(&call->lock);
-           putConnection(conn);
-           return np;
-       }
-
-       /* Ignore anything that's not relevant to the current call.  If there
-        * isn't a current call, then no packet is relevant. */
-       if (np->header.callNumber != currentCallNumber) {
-            if (rx_stats_active)
-                rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-            MUTEX_EXIT(&call->lock);
-           putConnection(conn);
-           return np;
-       }
-       /* If the service security object index stamped in the packet does not
-        * match the connection's security index, ignore the packet */
-       if (np->header.securityIndex != conn->securityIndex) {
-           MUTEX_EXIT(&call->lock);
-           putConnection(conn);
-           return np;
-       }
-
-       /* If we're receiving the response, then all transmit packets are
-        * implicitly acknowledged.  Get rid of them. */
-       if (np->header.type == RX_PACKET_TYPE_DATA) {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-           /* XXX Hack. Because we must release the global rx lock when
-            * sending packets (osi_NetSend) we drop all acks while we're
-            * traversing the tq in rxi_Start sending packets out because
-            * packets may move to the freePacketQueue as result of being here!
-            * So we drop these packets until we're safely out of the
-            * traversing. Really ugly!
-            * For fine grain RX locking, we set the acked field in the
-            * packets and let rxi_Start remove them from the transmit queue.
-            */
-           if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
-               rxi_SetAcksInTransmitQueue(call);
-#else
-               putConnection(conn);
-               return np;      /* xmitting; drop packet */
-#endif
-           } else {
-               rxi_ClearTransmitQueue(call, 0);
-           }
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
-           rxi_ClearTransmitQueue(call, 0);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-       } else {
-           if (np->header.type == RX_PACKET_TYPE_ACK) {
-               /* now check to see if this is an ack packet acknowledging that the
-                * server actually *lost* some hard-acked data.  If this happens we
-                * ignore this packet, as it may indicate that the server restarted in
-                * the middle of a call.  It is also possible that this is an old ack
-                * packet.  We don't abort the connection in this case, because this
-                * *might* just be an old ack packet.  The right way to detect a server
-                * restart in the midst of a call is to notice that the server epoch
-                * changed, btw.  */
-               /* XXX I'm not sure this is exactly right, since tfirst **IS**
-                * XXX unacknowledged.  I think that this is off-by-one, but
-                * XXX I don't dare change it just yet, since it will
-                * XXX interact badly with the server-restart detection
-                * XXX code in receiveackpacket.  */
-               if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
-                    if (rx_stats_active)
-                        rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-                   MUTEX_EXIT(&call->lock);
-                   putConnection(conn);
-                   return np;
-               }
-           }
-       }                       /* else not a data packet */
+       return np;
     }
 
     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
@@ -3624,6 +3551,11 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
     /* Now do packet type-specific processing */
     switch (np->header.type) {
     case RX_PACKET_TYPE_DATA:
+       /* If we're a client, and receiving a response, then all the packets
+        * we transmitted packets are implicitly acknowledged. */
+       if (type == RX_CLIENT_CONNECTION && !opr_queue_IsEmpty(&call->tq))
+           rxi_AckAllInTransmitQueue(call);
+
        np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port, tnop,
                                   newcallp);
        break;
@@ -3671,28 +3603,7 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
     case RX_PACKET_TYPE_ACKALL:
        /* All packets acknowledged, so we can drop all packets previously
         * readied for sending */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-       /* XXX Hack. We because we can't release the global rx lock when
-        * sending packets (osi_NetSend) we drop all ack pkts while we're
-        * traversing the tq in rxi_Start sending packets out because
-        * packets may move to the freePacketQueue as result of being
-        * here! So we drop these packets until we're safely out of the
-        * traversing. Really ugly!
-        * For fine grain RX locking, we set the acked field in the packets
-        * and let rxi_Start remove the packets from the transmit queue.
-        */
-       if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
-           rxi_SetAcksInTransmitQueue(call);
-           break;
-#else /* RX_ENABLE_LOCKS */
-           MUTEX_EXIT(&call->lock);
-           putConnection(conn);
-           return np;          /* xmitting; drop packet */
-#endif /* RX_ENABLE_LOCKS */
-       }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-       rxi_ClearTransmitQueue(call, 0);
+       rxi_AckAllInTransmitQueue(call);
        break;
     default:
        /* Should not reach here, unless the peer is broken: send an abort
@@ -4485,8 +4396,8 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
            rxi_ComputeRoundTripTime(tp, ap, call, peer, &now);
        }
 
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-       /* XXX Hack. Because we have to release the global rx lock when sending
+#ifdef RX_ENABLE_LOCKS
+       /* XXX Hack. Because we have to release the global call lock when sending
         * packets (osi_NetSend) we drop all acks while we're traversing the tq
         * in rxi_Start sending packets out because packets may move to the
         * freePacketQueue as result of being here! So we drop these packets until
@@ -4496,14 +4407,10 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
         * when it's done transmitting.
         */
        if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
            tp->flags |= RX_PKTFLAG_ACKED;
            call->flags |= RX_CALL_TQ_SOME_ACKED;
-#else /* RX_ENABLE_LOCKS */
-           break;
-#endif /* RX_ENABLE_LOCKS */
        } else
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
        {
            opr_queue_Remove(&tp->entry);
 #ifdef RX_TRACK_PACKETS
@@ -5039,7 +4946,6 @@ rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1,
 #endif /* RX_ENABLE_LOCKS */
 }
 
-
 #ifdef RX_ENABLE_LOCKS
 /* Set ack in all packets in transmit queue. rxi_Start will deal with
  * clearing them out.
@@ -5078,12 +4984,30 @@ rxi_SetAcksInTransmitQueue(struct rx_call *call)
 }
 #endif /* RX_ENABLE_LOCKS */
 
+/*!
+ * Acknowledge the whole transmit queue.
+ *
+ * If we're running without locks, or the transmit queue isn't busy, then
+ * we can just clear the queue now. Otherwise, we have to mark all of the
+ * packets as acknowledged, and let rxi_Start clear it later on
+ */
+static void
+rxi_AckAllInTransmitQueue(struct rx_call *call)
+{
+#ifdef RX_ENABLE_LOCKS
+    if (call->flags & RX_CALL_TQ_BUSY) {
+       rxi_SetAcksInTransmitQueue(call);
+       return;
+    }
+#endif
+    rxi_ClearTransmitQueue(call, 0);
+}
 /* Clear out the transmit queue for the current call (all packets have
  * been received by peer) */
 static void
 rxi_ClearTransmitQueue(struct rx_call *call, int force)
 {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
     struct opr_queue *cursor;
     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
        int someAcked = 0;
@@ -5099,16 +5023,16 @@ rxi_ClearTransmitQueue(struct rx_call *call, int force)
            call->flags |= RX_CALL_TQ_SOME_ACKED;
        }
     } else {
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
 #ifdef RXDEBUG_PACKET
         call->tqc -=
 #endif /* RXDEBUG_PACKET */
             rxi_FreePackets(0, &call->tq);
        rxi_WakeUpTransmitQueue(call);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
        call->flags &= ~RX_CALL_TQ_CLEARME;
     }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif
 
     rxi_rto_cancel(call);
     call->tfirst = call->tnext;        /* implicitly acknowledge all data already sent */
@@ -5303,7 +5227,7 @@ rxi_CallError(struct rx_call *call, afs_int32 error)
     if (call->error)
        error = call->error;
 
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
     if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
        rxi_ResetCall(call, 0);
     }
@@ -5386,9 +5310,7 @@ rxi_ResetCall(struct rx_call *call, int newcall)
     MUTEX_EXIT(&peer->peer_lock);
 
     flags = call->flags;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
     rxi_WaitforTQBusy(call);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
     rxi_ClearTransmitQueue(call, 1);
     if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
@@ -6123,7 +6045,7 @@ rxi_Start(struct rx_call *call, int istack)
     int maxXmitPackets;
 
     if (call->error) {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
         if (rx_stats_active)
             rx_atomic_inc(&rx_tq_debug.rxi_start_in_error);
 #endif
@@ -6146,15 +6068,15 @@ rxi_Start(struct rx_call *call, int istack)
         * But check whether we're here recursively, and let the other guy
         * do the work.
         */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
        if (!(call->flags & RX_CALL_TQ_BUSY)) {
            call->flags |= RX_CALL_TQ_BUSY;
            do {
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
            restart:
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
                call->flags &= ~RX_CALL_NEED_START;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
                nXmitPackets = 0;
                maxXmitPackets = MIN(call->twind, call->cwind);
                for (opr_queue_Scan(&call->tq, cursor)) {
@@ -6206,7 +6128,7 @@ rxi_Start(struct rx_call *call, int istack)
                                     istack);
                }
 
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
                if (call->error) {
                    /* We went into the error state while sending packets. Now is
                     * the time to reset the call. This will also inform the using
@@ -6219,7 +6141,7 @@ rxi_Start(struct rx_call *call, int istack)
                    rxi_CallError(call, call->error);
                    return;
                }
-#ifdef RX_ENABLE_LOCKS
+
                if (call->flags & RX_CALL_TQ_SOME_ACKED) {
                    int missing;
                    call->flags &= ~RX_CALL_TQ_SOME_ACKED;
@@ -6247,20 +6169,19 @@ rxi_Start(struct rx_call *call, int istack)
                    if (!missing)
                        call->flags |= RX_CALL_TQ_CLEARME;
                }
-#endif /* RX_ENABLE_LOCKS */
                if (call->flags & RX_CALL_TQ_CLEARME)
                    rxi_ClearTransmitQueue(call, 1);
            } while (call->flags & RX_CALL_NEED_START);
            /*
             * TQ references no longer protected by this flag; they must remain
-            * protected by the global lock.
+            * protected by the call lock.
             */
            call->flags &= ~RX_CALL_TQ_BUSY;
            rxi_WakeUpTransmitQueue(call);
        } else {
            call->flags |= RX_CALL_NEED_START;
        }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
     } else {
        rxi_rto_cancel(call);
     }
@@ -6357,7 +6278,7 @@ rxi_CheckCall(struct rx_call *call, int haveCTLock)
        return -1;
     }
 
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
     if (call->flags & RX_CALL_TQ_BUSY) {
        /* Call is active and will be reset by rxi_Start if it's
         * in an error state.