rx: Refactor rxi_ReceivePacket call selection
authorSimon Wilkinson <sxw@your-file-system.com>
Fri, 26 Oct 2012 13:50:51 +0000 (14:50 +0100)
committerDerrick Brashear <shadow@your-file-system.com>
Mon, 29 Oct 2012 20:36:01 +0000 (13:36 -0700)
Refactor the call selection logic in rxi_ReceivePacket so that it is
a little bit easier to follow, and better optimised to the common case.

Split the current logic into a function for packets being received by
a server, and a function for packets being received by a client.

Change-Id: Ie27de7952cc13fa3b92619cfe68e671e6d5e170c
Reviewed-on: http://gerrit.openafs.org/8297
Reviewed-by: Jeffrey Altman <jaltman@your-file-system.com>
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@your-file-system.com>

src/rx/rx.c

index 7a255be..a754018 100644 (file)
@@ -3214,12 +3214,13 @@ rxi_CheckBusy(struct rx_call *call)
  * or connected to a particular channel
  */
 static_inline int
-rxi_AbortIfServerBusy(osi_socket socket, afs_uint32 host,
-                     u_short port, struct rx_packet *np)
+rxi_AbortIfServerBusy(osi_socket socket, struct rx_connection *conn,
+                     struct rx_packet *np)
 {
     if ((rx_BusyThreshold > 0) &&
        (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
-       rxi_SendRawAbort(socket, host, port, rx_BusyError, np, 0);
+       rxi_SendRawAbort(socket, conn->peer->host, conn->peer->port,
+                        rx_BusyError, np, 0);
        if (rx_stats_active)
            rx_atomic_inc(&rx_stats.nBusies);
        return 1;
@@ -3228,6 +3229,133 @@ rxi_AbortIfServerBusy(osi_socket socket, afs_uint32 host,
     return 0;
 }
 
+static_inline struct rx_call *
+rxi_ReceiveClientCall(struct rx_packet *np, struct rx_connection *conn)
+{
+    int channel;
+    struct rx_call *call;
+
+    channel = np->header.cid & RX_CHANNELMASK;
+    MUTEX_ENTER(&conn->conn_call_lock);
+    call = conn->call[channel];
+    if (!call || conn->callNumber[channel] != np->header.callNumber) {
+       MUTEX_EXIT(&conn->conn_call_lock);
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+       return NULL;
+    }
+
+    MUTEX_ENTER(&call->lock);
+    MUTEX_EXIT(&conn->conn_call_lock);
+
+    if ((call->state == RX_STATE_DALLY)
+       && np->header.type == RX_PACKET_TYPE_ACK) {
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.ignorePacketDally);
+        MUTEX_EXIT(&call->lock);
+       return NULL;
+    }
+
+    return call;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveServerCall(osi_socket socket, struct rx_packet *np,
+                     struct rx_connection *conn)
+{
+    int channel;
+    struct rx_call *call;
+
+    channel = np->header.cid & RX_CHANNELMASK;
+    MUTEX_ENTER(&conn->conn_call_lock);
+    call = conn->call[channel];
+
+    if (!call) {
+       if (rxi_AbortIfServerBusy(socket, conn, np)) {
+           MUTEX_EXIT(&conn->conn_call_lock);
+           return NULL;
+       }
+
+       call = rxi_NewCall(conn, channel);  /* returns locked call */
+       *call->callNumber = np->header.callNumber;
+       MUTEX_EXIT(&conn->conn_call_lock);
+
+       call->state = RX_STATE_PRECALL;
+       clock_GetTime(&call->queueTime);
+       call->app.bytesSent = 0;
+       call->app.bytesRcvd = 0;
+       rxi_KeepAliveOn(call);
+
+       return call;
+    }
+
+    if (np->header.callNumber == conn->callNumber[channel]) {
+       MUTEX_ENTER(&call->lock);
+       MUTEX_EXIT(&conn->conn_call_lock);
+       return call;
+    }
+
+    if (np->header.callNumber < conn->callNumber[channel]) {
+       MUTEX_EXIT(&conn->conn_call_lock);
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+       return NULL;
+    }
+
+    MUTEX_ENTER(&call->lock);
+    MUTEX_EXIT(&conn->conn_call_lock);
+
+    /* Wait until the transmit queue is idle before deciding
+     * whether to reset the current call. Chances are that the
+     * call will be in ether DALLY or HOLD state once the TQ_BUSY
+     * flag is cleared.
+     */
+#ifdef RX_ENABLE_LOCKS
+    if (call->state == RX_STATE_ACTIVE) {
+       rxi_WaitforTQBusy(call);
+        /* If we entered error state while waiting,
+         * must call rxi_CallError to permit rxi_ResetCall
+         * to processed when the tqWaiter count hits zero.
+         */
+        if (call->error) {
+           rxi_CallError(call, call->error);
+           MUTEX_EXIT(&call->lock);
+            return NULL;
+        }
+    }
+#endif /* RX_ENABLE_LOCKS */
+    /* If the new call cannot be taken right now send a busy and set
+     * the error condition in this call, so that it terminates as
+     * quickly as possible */
+    if (call->state == RX_STATE_ACTIVE) {
+       rxi_CallError(call, RX_CALL_DEAD);
+       rxi_SendSpecial(call, conn, NULL, RX_PACKET_TYPE_BUSY,
+                       NULL, 0, 1);
+       MUTEX_EXIT(&call->lock);
+       return NULL;
+    }
+
+    if (rxi_AbortIfServerBusy(socket, conn, np)) {
+       MUTEX_EXIT(&call->lock);
+       return NULL;
+    }
+
+    rxi_ResetCall(call, 0);
+    /* The conn_call_lock is not held but no one else should be
+     * using this call channel while we are processing this incoming
+     * packet.  This assignment should be safe.
+     */
+    *call->callNumber = np->header.callNumber;
+    call->state = RX_STATE_PRECALL;
+    clock_GetTime(&call->queueTime);
+    call->app.bytesSent = 0;
+    call->app.bytesRcvd = 0;
+    rxi_KeepAliveOn(call);
+
+    return call;
+}
+
+
 /* There are two packet tracing routines available for testing and monitoring
  * Rx.  One is called just after every packet is received and the other is
  * called just before every packet is sent.  Received packets, have had their
@@ -3252,8 +3380,6 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
 {
     struct rx_call *call;
     struct rx_connection *conn;
-    int channel;
-    afs_uint32 currentCallNumber;
     int type;
 #ifdef RXDEBUG
     char *packetType;
@@ -3409,128 +3535,19 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
        }
     }
 
-    channel = np->header.cid & RX_CHANNELMASK;
-    MUTEX_ENTER(&conn->conn_call_lock);
-    call = conn->call[channel];
-
-    if (call) {
-       MUTEX_ENTER(&call->lock);
-        currentCallNumber = conn->callNumber[channel];
-        MUTEX_EXIT(&conn->conn_call_lock);
-    } else if (type == RX_SERVER_CONNECTION) {  /* No call allocated */
-
-       if (rxi_AbortIfServerBusy(socket, host, port, np)) {
+    if (type == RX_SERVER_CONNECTION) {
+       call = rxi_ReceiveServerCall(socket, np, conn);
+       if (call == NULL) {
            putConnection(conn);
            return np;
-       }
-
-       call = rxi_NewCall(conn, channel);  /* returns locked call */
-       *call->callNumber = currentCallNumber = np->header.callNumber;
-       MUTEX_EXIT(&conn->conn_call_lock);
-       call->state = RX_STATE_PRECALL;
-       clock_GetTime(&call->queueTime);
-       call->app.bytesSent = 0;
-       call->app.bytesRcvd = 0;
-       rxi_KeepAliveOn(call);
-
-    } else {    /* RX_CLIENT_CONNECTION and No call allocated */
-        /* This packet can't be for this call. If the new call address is
-         * 0 then no call is running on this channel. If there is a call
-         * then, since this is a client connection we're getting data for
-         * it must be for the previous call.
-         */
-        MUTEX_EXIT(&conn->conn_call_lock);
-        if (rx_stats_active)
-            rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-       putConnection(conn);
-        return np;
-    }
-
-    /* There is a non-NULL locked call at this point */
-    if (type == RX_SERVER_CONNECTION) {        /* We're the server */
-        if (np->header.callNumber < currentCallNumber) {
-            MUTEX_EXIT(&call->lock);
-            if (rx_stats_active)
-                rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-           putConnection(conn);
-            return np;
-        } else if (np->header.callNumber != currentCallNumber) {
-           /* Wait until the transmit queue is idle before deciding
-            * whether to reset the current call. Chances are that the
-            * call will be in ether DALLY or HOLD state once the TQ_BUSY
-            * flag is cleared.
-            */
-#ifdef RX_ENABLE_LOCKS
-            if (call->state == RX_STATE_ACTIVE) {
-                rxi_WaitforTQBusy(call);
-                /*
-                 * If we entered error state while waiting,
-                 * must call rxi_CallError to permit rxi_ResetCall
-                 * to processed when the tqWaiter count hits zero.
-                 */
-                if (call->error) {
-                    rxi_CallError(call, call->error);
-                    MUTEX_EXIT(&call->lock);
-                   putConnection(conn);
-                    return np;
-                }
-            }
-#endif /* RX_ENABLE_LOCKS */
-           /* If the new call cannot be taken right now send a busy and set
-            * the error condition in this call, so that it terminates as
-            * quickly as possible */
-           if (call->state == RX_STATE_ACTIVE) {
-               struct rx_packet *tp;
-
-               rxi_CallError(call, RX_CALL_DEAD);
-               tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
-                                    NULL, 0, 1);
-               MUTEX_EXIT(&call->lock);
-               putConnection(conn);
-               return tp;
-           }
-
-           if (rxi_AbortIfServerBusy(socket, host, port, np)) {
-               MUTEX_EXIT(&call->lock);
-               putConnection(conn);
-               return np;
-           }
-
-           rxi_ResetCall(call, 0);
-            /*
-             * The conn_call_lock is not held but no one else should be
-             * using this call channel while we are processing this incoming
-             * packet.  This assignment should be safe.
-             */
-           *call->callNumber = np->header.callNumber;
-           call->state = RX_STATE_PRECALL;
-           clock_GetTime(&call->queueTime);
-           call->app.bytesSent = 0;
-           call->app.bytesRcvd = 0;
-           rxi_KeepAliveOn(call);
-       } else {
-           /* Continuing call; do nothing here. */
-       }
-    } else {                   /* we're the client */
-       /* Ignore all incoming acknowledgements for calls in DALLY state */
-       if ((call->state == RX_STATE_DALLY)
-           && (np->header.type == RX_PACKET_TYPE_ACK)) {
-            if (rx_stats_active)
-                rx_atomic_inc(&rx_stats.ignorePacketDally);
-            MUTEX_EXIT(&call->lock);
+        }
+    } else {
+       call = rxi_ReceiveClientCall(np, conn);
+       if (call == NULL) {
            putConnection(conn);
            return np;
        }
 
-       /* Ignore anything that's not relevant to the current call.  If there
-        * isn't a current call, then no packet is relevant. */
-       if (np->header.callNumber != currentCallNumber) {
-            if (rx_stats_active)
-                rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-            MUTEX_EXIT(&call->lock);
-           putConnection(conn);
-           return np;
-       }
        /* If the service security object index stamped in the packet does not
         * match the connection's security index, ignore the packet */
        if (np->header.securityIndex != conn->securityIndex) {