Rx: Remove conn_call_lock contention between rx_NewCall and rx_EndCall
[openafs.git] / src / rx / rx.c
index 9d4e3d5..beb50ce 100755 (executable)
@@ -85,6 +85,9 @@ extern afs_int32 afs_termState;
 # include <string.h>
 # include <stdarg.h>
 # include <errno.h>
+# ifdef HAVE_STDINT_H
+#  include <stdint.h>
+# endif
 #ifdef AFS_NT40_ENV
 # include <stdlib.h>
 # include <fcntl.h>
@@ -347,7 +350,7 @@ struct rx_connection *rxLastConn = 0;
  *      conn->peer was previously a constant for all intents and so has no
  *      lock protecting this field. The multihomed client delta introduced
  *      a RX code change : change the peer field in the connection structure
- *      to that remote inetrface from which the last packet for this
+ *      to that remote interface from which the last packet for this
  *      connection was sent out. This may become an issue if further changes
  *      are made.
  */
@@ -800,8 +803,10 @@ rx_NewConnection(afs_uint32 shost, u_short sport, u_short sservice,
     SPLVAR;
 
     clock_NewTime();
-    dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
-          ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex));
+    dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %p, "
+        "serviceSecurityIndex %d)\n",
+         ntohl(shost), ntohs(sport), sservice, securityObject,
+        serviceSecurityIndex));
 
     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
      * the case of kmem_alloc? */
@@ -823,6 +828,7 @@ rx_NewConnection(afs_uint32 shost, u_short sport, u_short sservice,
     conn->securityData = (void *) 0;
     conn->securityIndex = serviceSecurityIndex;
     rx_SetConnDeadTime(conn, rx_connDeadTime);
+    rx_SetConnSecondsUntilNatPing(conn, 0);
     conn->ackRate = RX_FAST_ACK_RATE;
     conn->nSpecific = 0;
     conn->specific = NULL;
@@ -1029,6 +1035,10 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn)
        return;
     }
 
+    if (conn->natKeepAliveEvent) {
+       rxi_NatKeepAliveOff(conn);
+    }
+
     if (conn->delayedAbortEvent) {
        rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
        packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
@@ -1062,6 +1072,8 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn)
        rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
     if (conn->checkReachEvent)
        rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
+    if (conn->natKeepAliveEvent)
+       rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
 
     /* Add the connection to the list of destroyed connections that
      * need to be cleaned up. This is necessary to avoid deadlocks
@@ -1137,8 +1149,6 @@ rx_NewCall(struct rx_connection *conn)
 
     NETPRI;
     clock_GetTime(&queueTime);
-    MUTEX_ENTER(&conn->conn_call_lock);
-
     /*
      * Check if there are others waiting for a new call.
      * If so, let them go first to avoid starving them.
@@ -1150,14 +1160,13 @@ rx_NewCall(struct rx_connection *conn)
      * RX_CONN_MAKECALL_WAITING flag bit is used to 
      * indicate that there are indeed calls waiting.
      * The flag is set when the waiter is incremented.
-     * It is only cleared in rx_EndCall when 
-     * makeCallWaiters is 0.  This prevents us from 
-     * accidently destroying the connection while it
-     * is potentially about to be used.
+     * It is only cleared when makeCallWaiters is 0.
+     * This prevents us from accidently destroying the
+     * connection while it is potentially about to be used.
      */
+    MUTEX_ENTER(&conn->conn_call_lock);
     MUTEX_ENTER(&conn->conn_data_lock);
     if (conn->makeCallWaiters) {
-       conn->flags |= RX_CONN_MAKECALL_WAITING;
        conn->makeCallWaiters++;
         MUTEX_EXIT(&conn->conn_data_lock);
 
@@ -1168,6 +1177,8 @@ rx_NewCall(struct rx_connection *conn)
 #endif
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->makeCallWaiters--;
+        if (conn->makeCallWaiters == 0)
+            conn->flags &= ~RX_CONN_MAKECALL_WAITING;
     } 
     MUTEX_EXIT(&conn->conn_data_lock);
 
@@ -1175,14 +1186,20 @@ rx_NewCall(struct rx_connection *conn)
        for (i = 0; i < RX_MAXCALLS; i++) {
            call = conn->call[i];
            if (call) {
-               MUTEX_ENTER(&call->lock);
                if (call->state == RX_STATE_DALLY) {
-                   rxi_ResetCall(call, 0);
-                   (*call->callNumber)++;
-                   break;
-               }
-               MUTEX_EXIT(&call->lock);
+                    MUTEX_ENTER(&call->lock);
+                    if (call->state == RX_STATE_DALLY) {
+                        call->state = RX_STATE_RESET;
+                        MUTEX_EXIT(&conn->conn_call_lock);
+                        rxi_ResetCall(call, 0);
+                        MUTEX_ENTER(&conn->conn_call_lock);
+                        (*call->callNumber)++;
+                        break;
+                    }
+                    MUTEX_EXIT(&call->lock);
+                }
            } else {
+                /* rxi_NewCall returns with mutex locked */
                call = rxi_NewCall(conn, i);
                break;
            }
@@ -1202,6 +1219,8 @@ rx_NewCall(struct rx_connection *conn)
 #endif
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->makeCallWaiters--;
+        if (conn->makeCallWaiters == 0)
+            conn->flags &= ~RX_CONN_MAKECALL_WAITING;
        MUTEX_EXIT(&conn->conn_data_lock);
     }
     /*
@@ -1232,22 +1251,17 @@ rx_NewCall(struct rx_connection *conn)
 
     /* Turn on busy protocol. */
     rxi_KeepAliveOn(call);
-
-    MUTEX_EXIT(&call->lock);
     MUTEX_EXIT(&conn->conn_call_lock);
-    USERPRI;
 
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-    /* Now, if TQ wasn't cleared earlier, do it now. */
-    MUTEX_ENTER(&call->lock);
-    rxi_WaitforTQBusy(call);
-    if (call->flags & RX_CALL_TQ_CLEARME) {
-       rxi_ClearTransmitQueue(call, 1);
-       /*queue_Init(&call->tq);*/
+    if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
+        osi_Panic("rx_NewCall call about to be used without an empty tq");
     }
-    MUTEX_EXIT(&call->lock);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
+    MUTEX_EXIT(&call->lock);
+    USERPRI;
+
     dpf(("rx_NewCall(call %"AFS_PTR_FMT")\n", call));
     return call;
 }
@@ -1741,7 +1755,7 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
        CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
        MUTEX_EXIT(&call->lock);
     } else {
-       dpf(("rx_GetCall(socketp=0x%"AFS_PTR_FMT", *socketp=0x%"AFS_PTR_FMT")\n", socketp, *socketp));
+       dpf(("rx_GetCall(socketp=%p, *socketp=0x%x)\n", socketp, *socketp));
     }
 
     return call;
@@ -1888,11 +1902,11 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
 #endif
 
        rxi_calltrace(RX_CALL_START, call);
-       dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
+       dpf(("rx_GetCall(port=%d, service=%d) ==> call %p\n",
             call->conn->service->servicePort, call->conn->service->serviceId,
             call));
     } else {
-       dpf(("rx_GetCall(socketp=0x%"AFS_PTR_FMT", *socketp=0x%"AFS_PTR_FMT")\n", socketp, *socketp));
+       dpf(("rx_GetCall(socketp=%p, *socketp=0x%x)\n", socketp, *socketp));
     }
 
     USERPRI;
@@ -1937,8 +1951,6 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
     afs_int32 error;
     SPLVAR;
 
-
-
     dpf(("rx_EndCall(call %"AFS_PTR_FMT" rc %d error %d abortCode %d)\n",
           call, rc, call->error, call->abortCode));
 
@@ -2006,18 +2018,10 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
         * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
         * have checked this call, found it active and by the time it
         * goes to sleep, will have missed the signal.
-         *
-         * Do not clear the RX_CONN_MAKECALL_WAITING flag as long as
-         * there are threads waiting to use the conn object.
         */
-       MUTEX_EXIT(&call->lock);
-       MUTEX_ENTER(&conn->conn_call_lock);
-       MUTEX_ENTER(&call->lock);
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->flags |= RX_CONN_BUSY;
        if (conn->flags & RX_CONN_MAKECALL_WAITING) {
-            if (conn->makeCallWaiters == 0)
-                conn->flags &= (~RX_CONN_MAKECALL_WAITING);
            MUTEX_EXIT(&conn->conn_data_lock);
 #ifdef RX_ENABLE_LOCKS
            CV_BROADCAST(&conn->conn_call_cv);
@@ -2055,7 +2059,6 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
     MUTEX_EXIT(&call->lock);
     if (conn->type == RX_CLIENT_CONNECTION) {
-       MUTEX_EXIT(&conn->conn_call_lock);
        conn->flags &= ~RX_CONN_BUSY;
     }
     USERPRI;
@@ -2776,7 +2779,7 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
            *call->callNumber = np->header.callNumber;
 #ifdef RXDEBUG
            if (np->header.callNumber == 0) 
-               dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%0.06d len %d",
+               dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d",
                       np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
                       np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
                       np->header.flags, np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
@@ -4452,6 +4455,13 @@ rxi_ClearTransmitQueue(struct rx_call *call, int force)
         call->tqc -=
 #endif /* RXDEBUG_PACKET */
             rxi_FreePackets(0, &call->tq);
+       if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+#ifdef RX_ENABLE_LOCKS
+           CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+           osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+       }
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
        call->flags &= ~RX_CALL_TQ_CLEARME;
     }
@@ -4597,6 +4607,8 @@ rxi_ConnectionError(struct rx_connection *conn,
        MUTEX_ENTER(&conn->conn_data_lock);
        if (conn->challengeEvent)
            rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
+       if (conn->natKeepAliveEvent)
+           rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
        if (conn->checkReachEvent) {
            rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
            conn->checkReachEvent = 0;
@@ -4708,32 +4720,14 @@ rxi_ResetCall(struct rx_call *call, int newcall)
 
     flags = call->flags;
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-    if (flags & RX_CALL_TQ_BUSY) {
-       call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
-       call->flags |= (flags & RX_CALL_TQ_WAIT);
-#ifdef RX_ENABLE_LOCKS
-        CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-        osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-    } else
+    rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-    {
-       rxi_ClearTransmitQueue(call, 1);
-       /* why init the queue if you just emptied it? queue_Init(&call->tq); */
-       if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
-           dpf(("rcall %"AFS_PTR_FMT" has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
-       }
-       call->flags = 0;
-       while (call->tqWaiters) {
-#ifdef RX_ENABLE_LOCKS
-           CV_BROADCAST(&call->cv_tq);
-#else /* RX_ENABLE_LOCKS */
-           osi_rxWakeup(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-           call->tqWaiters--;
-       }
+
+    rxi_ClearTransmitQueue(call, 1);
+    if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+        dpf(("rcall %"AFS_PTR_FMT" has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
     }
+    call->flags = 0;
 
     rxi_ClearReceiveQueue(call);
     /* why init the queue if you just emptied it? queue_Init(&call->rq); */
@@ -5509,8 +5503,8 @@ rxi_Start(struct rxevent *event,
                         rx_MutexIncrement(rx_tq_debug.rxi_start_aborted, rx_stats_mutex);
                    call->flags &= ~RX_CALL_TQ_BUSY;
                    if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
-                       dpf(("call error %d while xmit %x has %d waiters and flags %d\n",
-                             call, call->error, call->tqWaiters, call->flags));
+                       dpf(("call error %d while xmit %p has %d waiters and flags %d\n",
+                             call->error, call, call->tqWaiters, call->flags));
 #ifdef RX_ENABLE_LOCKS
                        osirx_AssertMine(&call->lock, "rxi_Start middle");
                        CV_BROADCAST(&call->cv_tq);
@@ -5750,7 +5744,8 @@ rxi_CheckCall(struct rx_call *call)
     }
     /* see if we have a non-activity timeout */
     if (call->startWait && conn->idleDeadTime
-       && ((call->startWait + conn->idleDeadTime) < now)) {
+       && ((call->startWait + conn->idleDeadTime) < now) &&
+       (call->flags & RX_CALL_READER_WAIT)) {
        if (call->state == RX_STATE_ACTIVE) {
            rxi_CallError(call, RX_CALL_TIMEOUT);
            return -1;
@@ -5773,6 +5768,90 @@ rxi_CheckCall(struct rx_call *call)
     return 0;
 }
 
+void
+rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1, void *dummy)
+{
+    struct rx_connection *conn = arg1;
+    struct rx_header theader;
+    char tbuffer[1500];
+    struct sockaddr_in taddr;
+    char *tp;
+    char a[1] = { 0 };
+    struct iovec tmpiov[2];
+    osi_socket socket =
+        (conn->type ==
+         RX_CLIENT_CONNECTION ? rx_socket : conn->service->socket);
+
+
+    tp = &tbuffer[sizeof(struct rx_header)];
+    taddr.sin_family = AF_INET;
+    taddr.sin_port = rx_PortOf(rx_PeerOf(conn));
+    taddr.sin_addr.s_addr = rx_HostOf(rx_PeerOf(conn));
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
+    taddr.sin_len = sizeof(struct sockaddr_in);
+#endif
+    memset(&theader, 0, sizeof(theader));
+    theader.epoch = htonl(999);
+    theader.cid = 0;
+    theader.callNumber = 0;
+    theader.seq = 0;
+    theader.serial = 0;
+    theader.type = RX_PACKET_TYPE_VERSION;
+    theader.flags = RX_LAST_PACKET;
+    theader.serviceId = 0;
+
+    memcpy(tbuffer, &theader, sizeof(theader));
+    memcpy(tp, &a, sizeof(a));
+    tmpiov[0].iov_base = tbuffer;
+    tmpiov[0].iov_len = 1 + sizeof(struct rx_header);
+
+    osi_NetSend(socket, &taddr, tmpiov, 1, 1 + sizeof(struct rx_header), 1);
+
+    MUTEX_ENTER(&conn->conn_data_lock);
+    /* Only reschedule ourselves if the connection would not be destroyed */
+    if (conn->refCount <= 1) {
+       conn->natKeepAliveEvent = NULL;
+       MUTEX_EXIT(&conn->conn_data_lock);
+       rx_DestroyConnection(conn); /* drop the reference for this */
+    } else {
+       conn->natKeepAliveEvent = NULL;
+       conn->refCount--; /* drop the reference for this */
+       rxi_ScheduleNatKeepAliveEvent(conn);
+       MUTEX_EXIT(&conn->conn_data_lock);
+    }
+}
+
+void
+rxi_ScheduleNatKeepAliveEvent(struct rx_connection *conn)
+{
+    if (!conn->natKeepAliveEvent && conn->secondsUntilNatPing) {
+       struct clock when, now;
+       clock_GetTime(&now);
+       when = now;
+       when.sec += conn->secondsUntilNatPing;
+       conn->refCount++; /* hold a reference for this */
+       conn->natKeepAliveEvent =
+           rxevent_PostNow(&when, &now, rxi_NatKeepAliveEvent, conn, 0);
+    }
+}
+
+void
+rx_SetConnSecondsUntilNatPing(struct rx_connection *conn, afs_int32 seconds)
+{
+    MUTEX_ENTER(&conn->conn_data_lock);
+    conn->secondsUntilNatPing = seconds;
+    if (seconds != 0)
+       rxi_ScheduleNatKeepAliveEvent(conn);
+    MUTEX_EXIT(&conn->conn_data_lock);
+}
+
+void
+rxi_NatKeepAliveOn(struct rx_connection *conn)
+{
+    MUTEX_ENTER(&conn->conn_data_lock);
+    rxi_ScheduleNatKeepAliveEvent(conn);
+    MUTEX_EXIT(&conn->conn_data_lock);
+}
 
 /* When a call is in progress, this routine is called occasionally to
  * make sure that some traffic has arrived (or been sent to) the peer.