rx: Lock call for KeepAliveOn/KeepAliveOff
[openafs.git] / src / rx / rx.c
index a754018..5b560d1 100644 (file)
@@ -152,10 +152,9 @@ static void rxi_KeepAliveOn(struct rx_call *call);
 static void rxi_GrowMTUOn(struct rx_call *call);
 static void rxi_ChallengeOn(struct rx_connection *conn);
 static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
+static void rxi_AckAllInTransmitQueue(struct rx_call *call);
 
 #ifdef RX_ENABLE_LOCKS
-static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
-
 struct rx_tq_debug {
     rx_atomic_t rxi_start_aborted; /* rxi_start awoke after rxi_Send in error.*/
     rx_atomic_t rxi_start_in_error;
@@ -1415,7 +1414,7 @@ rxi_WaitforTQBusy(struct rx_call *call) {
     while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
        call->flags |= RX_CALL_TQ_WAIT;
        call->tqWaiters++;
-       osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
+       MUTEX_ASSERT(&call->lock);
        CV_WAIT(&call->cv_tq, &call->lock);
        call->tqWaiters--;
        if (call->tqWaiters == 0) {
@@ -1432,7 +1431,7 @@ rxi_WakeUpTransmitQueue(struct rx_call *call)
        dpf(("call %"AFS_PTR_FMT" has %d waiters and flags %d\n",
             call, call->tqWaiters, call->flags));
 #ifdef RX_ENABLE_LOCKS
-       osirx_AssertMine(&call->lock, "rxi_Start start");
+       MUTEX_ASSERT(&call->lock);
        CV_BROADCAST(&call->cv_tq);
 #else /* RX_ENABLE_LOCKS */
        osi_rxWakeup(&call->tq);
@@ -3312,12 +3311,13 @@ rxi_ReceiveServerCall(osi_socket socket, struct rx_packet *np,
      */
 #ifdef RX_ENABLE_LOCKS
     if (call->state == RX_STATE_ACTIVE) {
+       int old_error = call->error;
        rxi_WaitforTQBusy(call);
         /* If we entered error state while waiting,
          * must call rxi_CallError to permit rxi_ResetCall
          * to processed when the tqWaiter count hits zero.
          */
-        if (call->error) {
+        if (call->error && call->error != old_error) {
            rxi_CallError(call, call->error);
            MUTEX_EXIT(&call->lock);
             return NULL;
@@ -3535,81 +3535,28 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
        }
     }
 
-    if (type == RX_SERVER_CONNECTION) {
+    if (type == RX_SERVER_CONNECTION)
        call = rxi_ReceiveServerCall(socket, np, conn);
-       if (call == NULL) {
-           putConnection(conn);
-           return np;
-        }
-    } else {
+    else
        call = rxi_ReceiveClientCall(np, conn);
-       if (call == NULL) {
-           putConnection(conn);
-           return np;
-       }
-
-       /* If the service security object index stamped in the packet does not
-        * match the connection's security index, ignore the packet */
-       if (np->header.securityIndex != conn->securityIndex) {
-           MUTEX_EXIT(&call->lock);
-           putConnection(conn);
-           return np;
-       }
 
-       /* If we're receiving the response, then all transmit packets are
-        * implicitly acknowledged.  Get rid of them. */
-       if (np->header.type == RX_PACKET_TYPE_DATA) {
-#ifdef RX_ENABLE_LOCKS
-           /* XXX Hack. Because we must release the call lock when
-            * sending packets (osi_NetSend) we drop all acks while we're
-            * traversing the tq in rxi_Start sending packets out because
-            * packets may move to the freePacketQueue as result of being here!
-            * So we drop these packets until we're safely out of the
-            * traversing. Really ugly!
-            * For fine grain RX locking, we set the acked field in the
-            * packets and let rxi_Start remove them from the transmit queue.
-            */
-           if (call->flags & RX_CALL_TQ_BUSY) {
-               rxi_SetAcksInTransmitQueue(call);
-           } else {
-               rxi_ClearTransmitQueue(call, 0);
-           }
-#else /* RX_ENABLE_LOCKS */
-           rxi_ClearTransmitQueue(call, 0);
-#endif /* RX_ENABLE_LOCKS */
-       } else {
-           if (np->header.type == RX_PACKET_TYPE_ACK) {
-               /* now check to see if this is an ack packet acknowledging that the
-                * server actually *lost* some hard-acked data.  If this happens we
-                * ignore this packet, as it may indicate that the server restarted in
-                * the middle of a call.  It is also possible that this is an old ack
-                * packet.  We don't abort the connection in this case, because this
-                * *might* just be an old ack packet.  The right way to detect a server
-                * restart in the midst of a call is to notice that the server epoch
-                * changed, btw.  */
-               /* XXX I'm not sure this is exactly right, since tfirst **IS**
-                * XXX unacknowledged.  I think that this is off-by-one, but
-                * XXX I don't dare change it just yet, since it will
-                * XXX interact badly with the server-restart detection
-                * XXX code in receiveackpacket.  */
-               if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
-                    if (rx_stats_active)
-                        rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-                   MUTEX_EXIT(&call->lock);
-                   putConnection(conn);
-                   return np;
-               }
-           }
-       }                       /* else not a data packet */
+    if (call == NULL) {
+       putConnection(conn);
+       return np;
     }
 
-    osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
+    MUTEX_ASSERT(&call->lock);
     /* Set remote user defined status from packet */
     call->remoteStatus = np->header.userStatus;
 
     /* Now do packet type-specific processing */
     switch (np->header.type) {
     case RX_PACKET_TYPE_DATA:
+       /* If we're a client, and receiving a response, then all the packets
+        * we transmitted packets are implicitly acknowledged. */
+       if (type == RX_CLIENT_CONNECTION && !opr_queue_IsEmpty(&call->tq))
+           rxi_AckAllInTransmitQueue(call);
+
        np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port, tnop,
                                   newcallp);
        break;
@@ -3657,22 +3604,7 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
     case RX_PACKET_TYPE_ACKALL:
        /* All packets acknowledged, so we can drop all packets previously
         * readied for sending */
-#ifdef RX_ENABLE_LOCKS
-       /* XXX Hack. We because we can't release the call lock when
-        * sending packets (osi_NetSend) we drop all ack pkts while we're
-        * traversing the tq in rxi_Start sending packets out because
-        * packets may move to the freePacketQueue as result of being
-        * here! So we drop these packets until we're safely out of the
-        * traversing. Really ugly!
-        * For fine grain RX locking, we set the acked field in the packets
-        * and let rxi_Start remove the packets from the transmit queue.
-        */
-       if (call->flags & RX_CALL_TQ_BUSY) {
-           rxi_SetAcksInTransmitQueue(call);
-           break;
-       }
-#endif /* RX_ENABLE_LOCKS */
-       rxi_ClearTransmitQueue(call, 0);
+       rxi_AckAllInTransmitQueue(call);
        break;
     default:
        /* Should not reach here, unless the peer is broken: send an abort
@@ -5015,7 +4947,6 @@ rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1,
 #endif /* RX_ENABLE_LOCKS */
 }
 
-
 #ifdef RX_ENABLE_LOCKS
 /* Set ack in all packets in transmit queue. rxi_Start will deal with
  * clearing them out.
@@ -5054,6 +4985,24 @@ rxi_SetAcksInTransmitQueue(struct rx_call *call)
 }
 #endif /* RX_ENABLE_LOCKS */
 
+/*!
+ * Acknowledge the whole transmit queue.
+ *
+ * If we're running without locks, or the transmit queue isn't busy, then
+ * we can just clear the queue now. Otherwise, we have to mark all of the
+ * packets as acknowledged, and let rxi_Start clear it later on
+ */
+static void
+rxi_AckAllInTransmitQueue(struct rx_call *call)
+{
+#ifdef RX_ENABLE_LOCKS
+    if (call->flags & RX_CALL_TQ_BUSY) {
+       rxi_SetAcksInTransmitQueue(call);
+       return;
+    }
+#endif
+    rxi_ClearTransmitQueue(call, 0);
+}
 /* Clear out the transmit queue for the current call (all packets have
  * been received by peer) */
 static void
@@ -5272,9 +5221,7 @@ rx_InterruptCall(struct rx_call *call, afs_int32 error)
 void
 rxi_CallError(struct rx_call *call, afs_int32 error)
 {
-#ifdef DEBUG
-    osirx_AssertMine(&call->lock, "rxi_CallError");
-#endif
+    MUTEX_ASSERT(&call->lock);
     dpf(("rxi_CallError call %"AFS_PTR_FMT" error %d call->error %d\n", call, error, call->error));
     if (call->error)
        error = call->error;
@@ -5302,9 +5249,8 @@ rxi_ResetCall(struct rx_call *call, int newcall)
     int flags;
     struct rx_peer *peer;
     struct rx_packet *packet;
-#ifdef DEBUG
-    osirx_AssertMine(&call->lock, "rxi_ResetCall");
-#endif
+
+    MUTEX_ASSERT(&call->lock);
     dpf(("rxi_ResetCall(call %"AFS_PTR_FMT", newcall %d)\n", call, newcall));
 
     /* Notify anyone who is waiting for asynchronous packet arrival */
@@ -6706,12 +6652,16 @@ rxi_KeepAliveOn(struct rx_call *call)
 void
 rx_KeepAliveOff(struct rx_call *call)
 {
+    MUTEX_ENTER(&call->lock);
     rxi_KeepAliveOff(call);
+    MUTEX_EXIT(&call->lock);
 }
 void
 rx_KeepAliveOn(struct rx_call *call)
 {
+    MUTEX_ENTER(&call->lock);
     rxi_KeepAliveOn(call);
+    MUTEX_EXIT(&call->lock);
 }
 
 static void
@@ -8024,15 +7974,6 @@ shutdown_rx(void)
     UNLOCK_RX_INIT;
 }
 
-#ifdef RX_ENABLE_LOCKS
-void
-osirx_AssertMine(afs_kmutex_t * lockaddr, char *msg)
-{
-    if (!MUTEX_ISMINE(lockaddr))
-       osi_Panic("Lock not held: %s", msg);
-}
-#endif /* RX_ENABLE_LOCKS */
-
 #ifndef KERNEL
 
 /*