RX: Avoid timing out non-kernel busy channels
[openafs.git] / src / rx / rx.c
index cb2da3b..9fc9718 100644 (file)
@@ -23,6 +23,9 @@
 #   include "h/socket.h"
 #  endif
 #  include "netinet/in.h"
+#  ifdef AFS_SUN58_ENV
+#   include "netinet/ip6.h"
+#  endif
 #  ifdef AFS_SUN57_ENV
 #   include "inet/common.h"
 #   include "inet/ip.h"
@@ -140,6 +143,16 @@ static unsigned int rxi_rpc_peer_stat_cnt;
 
 static unsigned int rxi_rpc_process_stat_cnt;
 
+/*
+ * rxi_busyChannelError is the error to return to the application when a call
+ * channel appears busy (inferred from the receipt of RX_PACKET_TYPE_BUSY
+ * packets on the channel), and there are other call channels in the
+ * connection that are not busy. If 0, we do not return errors upon receiving
+ * busy packets; we just keep trying on the same call channel until we hit a
+ * timeout.
+ */
+static afs_int32 rxi_busyChannelError = 0;
+
 rx_atomic_t rx_nWaiting = RX_ATOMIC_INIT(0);
 rx_atomic_t rx_nWaited = RX_ATOMIC_INIT(0);
 
@@ -597,6 +610,20 @@ rx_Init(u_int port)
     return rx_InitHost(htonl(INADDR_ANY), port);
 }
 
+/**
+ * Sets the error generated when a busy call channel is detected.
+ *
+ * @param[in] error The error to return for a call on a busy channel.
+ *
+ * @pre Neither rx_Init nor rx_InitHost have been called yet
+ */
+void
+rx_SetBusyChannelError(afs_int32 error)
+{
+    osi_Assert(rxinit_status != 0);
+    rxi_busyChannelError = error;
+}
+
 /* called with unincremented nRequestsRunning to see if it is OK to start
  * a new thread in this service.  Could be "no" for two reasons: over the
  * max quota, or would prevent others from reaching their min quota.
@@ -840,6 +867,7 @@ rx_NewConnection(afs_uint32 shost, u_short sport, u_short sservice,
     for (i = 0; i < RX_MAXCALLS; i++) {
        conn->twind[i] = rx_initSendWindow;
        conn->rwind[i] = rx_initReceiveWindow;
+       conn->lastBusy[i] = 0;
     }
 
     RXS_NewConnection(securityObject, conn);
@@ -1207,9 +1235,10 @@ rxi_WakeUpTransmitQueue(struct rx_call *call)
 struct rx_call *
 rx_NewCall(struct rx_connection *conn)
 {
-    int i, wait;
+    int i, wait, ignoreBusy = 1;
     struct rx_call *call;
     struct clock queueTime;
+    afs_uint32 leastBusy = 0;
     SPLVAR;
 
     clock_NewTime();
@@ -1260,9 +1289,25 @@ rx_NewCall(struct rx_connection *conn)
        for (i = 0; i < RX_MAXCALLS; i++) {
            call = conn->call[i];
            if (call) {
+               if (!ignoreBusy && conn->lastBusy[i] != leastBusy) {
+                   /* we're not ignoring busy call slots; only look at the
+                    * call slot that is the "least" busy */
+                   continue;
+               }
+
                if (call->state == RX_STATE_DALLY) {
                     MUTEX_ENTER(&call->lock);
                     if (call->state == RX_STATE_DALLY) {
+                       if (ignoreBusy && conn->lastBusy[i]) {
+                           /* if we're ignoring busy call slots, skip any ones that
+                            * have lastBusy set */
+                           if (leastBusy == 0 || conn->lastBusy[i] < leastBusy) {
+                               leastBusy = conn->lastBusy[i];
+                           }
+                           MUTEX_EXIT(&call->lock);
+                           continue;
+                       }
+
                         /*
                          * We are setting the state to RX_STATE_RESET to
                          * ensure that no one else will attempt to use this
@@ -1319,6 +1364,15 @@ rx_NewCall(struct rx_connection *conn)
                     MUTEX_EXIT(&call->lock);
                 }
            } else {
+               if (ignoreBusy && conn->lastBusy[i]) {
+                   /* if we're ignoring busy call slots, skip any ones that
+                    * have lastBusy set */
+                   if (leastBusy == 0 || conn->lastBusy[i] < leastBusy) {
+                       leastBusy = conn->lastBusy[i];
+                   }
+                   continue;
+               }
+
                 /* rxi_NewCall returns with mutex locked */
                call = rxi_NewCall(conn, i);
                 MUTEX_ENTER(&rx_refcnt_mutex);
@@ -1328,10 +1382,18 @@ rx_NewCall(struct rx_connection *conn)
            }
        }
        if (i < RX_MAXCALLS) {
+           conn->lastBusy[i] = 0;
            break;
        }
         if (!wait)
             continue;
+       if (leastBusy && ignoreBusy) {
+           /* we didn't find a useable call slot, but we did see at least one
+            * 'busy' slot; look again and only use a slot with the 'least
+            * busy time */
+           ignoreBusy = 0;
+           continue;
+       }
 
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->flags |= RX_CONN_MAKECALL_WAITING;
@@ -2170,6 +2232,11 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
         MUTEX_EXIT(&call->lock);
         MUTEX_ENTER(&conn->conn_call_lock);
         MUTEX_ENTER(&call->lock);
+
+       if (!(call->flags & RX_CALL_PEER_BUSY)) {
+           conn->lastBusy[call->channel] = 0;
+       }
+
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->flags |= RX_CONN_BUSY;
        if (conn->flags & RX_CONN_MAKECALL_WAITING) {
@@ -2766,6 +2833,87 @@ rxi_FindConnection(osi_socket socket, afs_uint32 host,
     return conn;
 }
 
+/**
+ * Timeout a call on a busy call channel if appropriate.
+ *
+ * @param[in] call The busy call.
+ *
+ * @pre 'call' is marked as busy (namely,
+ *      call->conn->lastBusy[call->channel] != 0)
+ *
+ * @pre call->lock is held
+ * @pre rxi_busyChannelError is nonzero
+ *
+ * @note call->lock is dropped and reacquired
+ */
+static void
+rxi_CheckBusy(struct rx_call *call)
+{
+    struct rx_connection *conn = call->conn;
+    int channel = call->channel;
+    int freechannel = 0;
+    int i;
+    afs_uint32 callNumber = *call->callNumber;
+
+    MUTEX_EXIT(&call->lock);
+
+    MUTEX_ENTER(&conn->conn_call_lock);
+
+    /* Are there any other call slots on this conn that we should try? Look for
+     * slots that are empty and are either non-busy, or were marked as busy
+     * longer than conn->secondsUntilDead seconds before this call started. */
+
+    for (i = 0; i < RX_MAXCALLS && !freechannel; i++) {
+       if (i == channel) {
+           /* only look at channels that aren't us */
+           continue;
+       }
+
+       if (conn->lastBusy[i]) {
+           /* if this channel looked busy too recently, don't look at it */
+           if (conn->lastBusy[i] >= call->startTime.sec) {
+               continue;
+           }
+           if (call->startTime.sec - conn->lastBusy[i] < conn->secondsUntilDead) {
+               continue;
+           }
+       }
+
+       if (conn->call[i]) {
+           struct rx_call *tcall = conn->call[i];
+           MUTEX_ENTER(&tcall->lock);
+           if (tcall->state == RX_STATE_DALLY) {
+               freechannel = 1;
+           }
+           MUTEX_EXIT(&tcall->lock);
+       } else {
+           freechannel = 1;
+       }
+    }
+
+    MUTEX_EXIT(&conn->conn_call_lock);
+
+    MUTEX_ENTER(&call->lock);
+
+    /* Since the call->lock and conn->conn_call_lock have been released it is
+     * possible that (1) the call may no longer be busy and/or (2) the call may
+     * have been reused by another waiting thread. Therefore, we must confirm
+     * that the call state has not changed when deciding whether or not to
+     * force this application thread to retry by forcing a Timeout error. */
+
+    if (freechannel && *call->callNumber == callNumber &&
+        (call->flags & RX_CALL_PEER_BUSY)) {
+       /* Since 'freechannel' is set, there exists another channel in this
+        * rx_conn that the application thread might be able to use. We know
+        * that we have the correct call since callNumber is unchanged, and we
+        * know that the call is still busy. So, set the call error state to
+        * rxi_busyChannelError so the application can retry the request,
+        * presumably on a less-busy call channel. */
+
+       rxi_CallError(call, rxi_busyChannelError);
+    }
+}
+
 /* There are two packet tracing routines available for testing and monitoring
  * Rx.  One is called just after every packet is received and the other is
  * called just before every packet is sent.  Received packets, have had their
@@ -3249,9 +3397,26 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
         MUTEX_EXIT(&rx_refcnt_mutex);
        return np;              /* xmitting; drop packet */
     }
-    case RX_PACKET_TYPE_BUSY:
-       /* XXXX */
-       break;
+    case RX_PACKET_TYPE_BUSY: {
+       struct clock busyTime;
+       clock_NewTime();
+       clock_GetTime(&busyTime);
+
+       MUTEX_EXIT(&call->lock);
+
+       MUTEX_ENTER(&conn->conn_call_lock);
+       MUTEX_ENTER(&call->lock);
+       conn->lastBusy[call->channel] = busyTime.sec;
+       call->flags |= RX_CALL_PEER_BUSY;
+       MUTEX_EXIT(&call->lock);
+       MUTEX_EXIT(&conn->conn_call_lock);
+
+       MUTEX_ENTER(&rx_refcnt_mutex);
+       conn->refCount--;
+       MUTEX_EXIT(&rx_refcnt_mutex);
+       return np;
+    }
+
     case RX_PACKET_TYPE_ACKALL:
        /* All packets acknowledged, so we can drop all packets previously
         * readied for sending */
@@ -3279,7 +3444,6 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
        }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        rxi_ClearTransmitQueue(call, 0);
-       rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
        break;
     default:
        /* Should not reach here, unless the peer is broken: send an abort
@@ -3293,6 +3457,8 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
      * the packet will be delivered to the user before any get time is required
      * (if not, then the time won't actually be re-evaluated here). */
     call->lastReceiveTime = clock_Sec();
+    /* we've received a legit packet, so the channel is not busy */
+    call->flags &= ~RX_CALL_PEER_BUSY;
     MUTEX_EXIT(&call->lock);
     MUTEX_ENTER(&rx_refcnt_mutex);
     conn->refCount--;
@@ -4970,6 +5136,10 @@ rxi_ResetCall(struct rx_call *call, int newcall)
        call->arrivalProc = (void (*)())0;
     }
 
+    if (call->growMTUEvent)
+       rxevent_Cancel(call->growMTUEvent, call,
+                      RX_CALL_REFCOUNT_ALIVE);
+
     if (call->delayedAbortEvent) {
        rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
        packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
@@ -5020,6 +5190,12 @@ rxi_ResetCall(struct rx_call *call, int newcall)
     }
     call->flags = 0;
 
+    if ((flags & RX_CALL_PEER_BUSY)) {
+       /* The call channel is still busy; resetting the call doesn't change
+        * that */
+       call->flags |= RX_CALL_PEER_BUSY;
+    }
+
     rxi_ClearReceiveQueue(call);
     /* why init the queue if you just emptied it? queue_Init(&call->rq); */
 
@@ -5637,6 +5813,11 @@ rxi_Start(struct rxevent *event,
        CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
         MUTEX_EXIT(&rx_refcnt_mutex);
        call->resendEvent = NULL;
+
+       if (rxi_busyChannelError && (call->flags & RX_CALL_PEER_BUSY)) {
+           rxi_CheckBusy(call);
+       }
+
        if (queue_IsEmpty(&call->tq)) {
            /* Nothing to do */
            return;
@@ -5999,6 +6180,9 @@ rxi_CheckCall(struct rx_call *call)
            rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
            rxevent_Cancel(call->keepAliveEvent, call,
                           RX_CALL_REFCOUNT_ALIVE);
+           if (call->growMTUEvent)
+               rxevent_Cancel(call->growMTUEvent, call,
+                              RX_CALL_REFCOUNT_ALIVE);
             MUTEX_ENTER(&rx_refcnt_mutex);
            if (call->refCount == 0) {
                rxi_FreeCall(call, haveCTLock);
@@ -6038,7 +6222,7 @@ rxi_CheckCall(struct rx_call *call)
        }
     }
 
-    if (hardDeadTime) {
+    if (conn->hardDeadTime) {
        hardDeadTime = conn->hardDeadTime + fudgeFactor;
     }