rx-connection-clones-20080520
authorMatt Benjamin <matt@linuxbox.com>
Tue, 20 May 2008 21:24:16 +0000 (21:24 +0000)
committerDerrick Brashear <shadow@dementia.org>
Tue, 20 May 2008 21:24:16 +0000 (21:24 +0000)
LICENSE IPL10
FIXES 89557

add connection clones to allow more than maxcalls per "connection"

src/rx/rx.c
src/rx/rx.h
src/rx/rx_globals.h

index 3341cb1..34c3b23 100644 (file)
@@ -738,54 +738,77 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
                 int serviceSecurityIndex)
 {
     int hashindex, i;
-    afs_int32 cid;
-    register struct rx_connection *conn;
+    afs_int32 cid, cix, nclones;
+    register struct rx_connection *conn, *tconn, *ptconn;
 
     SPLVAR;
 
     clock_NewTime();
     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex));
 
+       conn = tconn = 0;
+       nclones = rx_max_clones_per_connection;
+
     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
      * the case of kmem_alloc? */
-    conn = rxi_AllocConnection();
-#ifdef RX_ENABLE_LOCKS
-    MUTEX_INIT(&conn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&conn->conn_data_lock, "conn call lock", MUTEX_DEFAULT, 0);
-    CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
-#endif
+
     NETPRI;
     MUTEX_ENTER(&rx_connHashTable_lock);
-    cid = (rx_nextCid += RX_MAXCALLS);
-    conn->type = RX_CLIENT_CONNECTION;
-    conn->cid = cid;
-    conn->epoch = rx_epoch;
-    conn->peer = rxi_FindPeer(shost, sport, 0, 1);
-    conn->serviceId = sservice;
-    conn->securityObject = securityObject;
-    conn->securityData = (void *) 0;
-    conn->securityIndex = serviceSecurityIndex;
-    rx_SetConnDeadTime(conn, rx_connDeadTime);
-    conn->ackRate = RX_FAST_ACK_RATE;
-    conn->nSpecific = 0;
-    conn->specific = NULL;
-    conn->challengeEvent = NULL;
-    conn->delayedAbortEvent = NULL;
-    conn->abortCount = 0;
-    conn->error = 0;
+
+    /* send in the clones */
+    for(cix = 0; cix <= nclones; ++cix) {
+         
+         ptconn = tconn;
+         tconn = rxi_AllocConnection();
+         tconn->type = RX_CLIENT_CONNECTION;
+         tconn->epoch = rx_epoch;
+         tconn->peer = rxi_FindPeer(shost, sport, 0, 1);
+         tconn->serviceId = sservice;
+         tconn->securityObject = securityObject;
+         tconn->securityData = (void *) 0;
+         tconn->securityIndex = serviceSecurityIndex;
+         tconn->ackRate = RX_FAST_ACK_RATE;
+         tconn->nSpecific = 0;
+         tconn->specific = NULL;
+         tconn->challengeEvent = NULL;
+         tconn->delayedAbortEvent = NULL;
+         tconn->abortCount = 0;
+         tconn->error = 0;
     for (i = 0; i < RX_MAXCALLS; i++) {
-       conn->twind[i] = rx_initSendWindow;
-       conn->rwind[i] = rx_initReceiveWindow;
+       tconn->twind[i] = rx_initSendWindow;
+       tconn->rwind[i] = rx_initReceiveWindow;
     }
-
-    RXS_NewConnection(securityObject, conn);
-    hashindex =
-       CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
-
-    conn->refCount++;          /* no lock required since only this thread knows... */
-    conn->next = rx_connHashTable[hashindex];
-    rx_connHashTable[hashindex] = conn;
-    rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex);
+         tconn->parent = 0;
+         tconn->next_clone = 0;
+         tconn->nclones = nclones;
+         rx_SetConnDeadTime(tconn, rx_connDeadTime);
+               
+         if(cix == 0) {
+               conn = tconn;
+         } else {
+               tconn->flags |= RX_CLONED_CONNECTION;
+               tconn->parent = conn;
+               ptconn->next_clone = tconn;
+         }
+
+         /* generic connection setup */
+#ifdef RX_ENABLE_LOCKS
+         MUTEX_INIT(&tconn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
+         MUTEX_INIT(&tconn->conn_data_lock, "conn data lock", MUTEX_DEFAULT, 0);
+         CV_INIT(&tconn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
+#endif
+         cid = (rx_nextCid += RX_MAXCALLS);
+         tconn->cid = cid;
+         RXS_NewConnection(securityObject, tconn);
+         hashindex =
+               CONN_HASH(shost, sport, tconn->cid, tconn->epoch, 
+                                 RX_CLIENT_CONNECTION);
+         tconn->refCount++; /* no lock required since only this thread knows */
+         tconn->next = rx_connHashTable[hashindex];
+         rx_connHashTable[hashindex] = tconn;
+         rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex);     
+    }
+    
     MUTEX_EXIT(&rx_connHashTable_lock);
     USERPRI;
     return conn;
@@ -794,10 +817,14 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
 void
 rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
 {
-    /* The idea is to set the dead time to a value that allows several
-     * keepalives to be dropped without timing out the connection. */
-    conn->secondsUntilDead = MAX(seconds, 6);
-    conn->secondsUntilPing = conn->secondsUntilDead / 6;
+  /* The idea is to set the dead time to a value that allows several
+   * keepalives to be dropped without timing out the connection. */
+  struct rx_connection *tconn;
+  tconn = conn;
+  do {
+       tconn->secondsUntilDead = MAX(seconds, 6);
+       tconn->secondsUntilPing = tconn->secondsUntilDead / 6;
+  } while(tconn->next_clone && (tconn = tconn->next_clone));
 }
 
 int rxi_lowPeerRefCount = 0;
@@ -864,18 +891,42 @@ rxi_CleanupConnection(struct rx_connection *conn)
 void
 rxi_DestroyConnection(register struct rx_connection *conn)
 {
-    MUTEX_ENTER(&rx_connHashTable_lock);
-    rxi_DestroyConnectionNoLock(conn);
-    /* conn should be at the head of the cleanup list */
-    if (conn == rx_connCleanup_list) {
+  register struct rx_connection *tconn, *dtconn;
+
+  MUTEX_ENTER(&rx_connHashTable_lock);
+  
+  if(!(conn->flags & RX_CLONED_CONNECTION)) {
+       tconn = conn->next_clone;
+       conn->next_clone = 0; /* once */
+       do {
+         if(tconn) {
+               dtconn = tconn;
+               tconn = tconn->next_clone;
+               rxi_DestroyConnectionNoLock(dtconn);
+               /* destroyed? */
+               if (dtconn == rx_connCleanup_list) {
+                 rx_connCleanup_list = rx_connCleanup_list->next;
+                 MUTEX_EXIT(&rx_connHashTable_lock);
+                 /* rxi_CleanupConnection will free tconn */   
+                 rxi_CleanupConnection(dtconn);
+                 MUTEX_ENTER(&rx_connHashTable_lock);
+                 (conn->nclones)--;
+               }
+         }
+       } while(tconn);
+  }
+
+  rxi_DestroyConnectionNoLock(conn);
+  /* conn should be at the head of the cleanup list */
+  if (conn == rx_connCleanup_list) {
        rx_connCleanup_list = rx_connCleanup_list->next;
        MUTEX_EXIT(&rx_connHashTable_lock);
        rxi_CleanupConnection(conn);
-    }
+  }
 #ifdef RX_ENABLE_LOCKS
-    else {
+  else {
        MUTEX_EXIT(&rx_connHashTable_lock);
-    }
+  }
 #endif /* RX_ENABLE_LOCKS */
 }
 
@@ -1061,6 +1112,7 @@ rx_NewCall(register struct rx_connection *conn)
 {
     register int i;
     register struct rx_call *call;
+       register struct rx_connection *tconn;
     struct clock queueTime;
     SPLVAR;
 
@@ -1103,39 +1155,51 @@ rx_NewCall(register struct rx_connection *conn)
     } 
     MUTEX_EXIT(&conn->conn_data_lock);
 
+       /* search for next free call on this connection or 
+        * its clones, if any */
     for (;;) {
-       for (i = 0; i < RX_MAXCALLS; i++) {
-           call = conn->call[i];
-           if (call) {
-               MUTEX_ENTER(&call->lock);
-               if (call->state == RX_STATE_DALLY) {
-                   rxi_ResetCall(call, 0);
-                   (*call->callNumber)++;
-                   break;
+               tconn = conn;
+               do {
+                       for (i = 0; i < RX_MAXCALLS; i++) {
+                               call = tconn->call[i];
+                               if (call) {
+                                       MUTEX_ENTER(&call->lock);
+                                       if (call->state == RX_STATE_DALLY) {
+                                               rxi_ResetCall(call, 0);
+                                               (*call->callNumber)++;
+                                               goto f_call;
+                                       }
+                                       MUTEX_EXIT(&call->lock);
+                               } else {
+                                       call = rxi_NewCall(tconn, i);
+                                       goto f_call;
+                               }
+                       } /* for i < RX_MAXCALLS */
+               } while (tconn->next_clone && (tconn = tconn->next_clone));
+
+       f_call:
+
+               if (i < RX_MAXCALLS) {
+                       break;
                }
-               MUTEX_EXIT(&call->lock);
-           } else {
-               call = rxi_NewCall(conn, i);
-               break;
-           }
-       }
-       if (i < RX_MAXCALLS) {
-           break;
-       }
-       MUTEX_ENTER(&conn->conn_data_lock);
-       conn->flags |= RX_CONN_MAKECALL_WAITING;
-       conn->makeCallWaiters++;
-       MUTEX_EXIT(&conn->conn_data_lock);
+
+               /* to be here, all available calls for this connection (and all
+                * its clones) must be in use */
+
+               MUTEX_ENTER(&conn->conn_data_lock);
+               conn->flags |= RX_CONN_MAKECALL_WAITING;
+               conn->makeCallWaiters++;
+               MUTEX_EXIT(&conn->conn_data_lock);
 
 #ifdef RX_ENABLE_LOCKS
-       CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
+               CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
 #else
-       osi_rxSleep(conn);
+               osi_rxSleep(conn);
 #endif
-       MUTEX_ENTER(&conn->conn_data_lock);
-       conn->makeCallWaiters--;
-       MUTEX_EXIT(&conn->conn_data_lock);
-    }
+               MUTEX_ENTER(&conn->conn_data_lock);
+               conn->makeCallWaiters--;
+               MUTEX_EXIT(&conn->conn_data_lock);
+    } /* for ;; */
     /*
      * Wake up anyone else who might be giving us a chance to
      * run (see code above that avoids resource starvation).
index ebe0047..d3ae90a 100644 (file)
@@ -212,6 +212,20 @@ returned with an error code of RX_CALL_DEAD ( transient error ) */
 #define rx_EnableHotThread()           (rx_enable_hot_thread = 1)
 #define rx_DisableHotThread()          (rx_enable_hot_thread = 0)
 
+/* Macros to set max connection clones (each allows RX_MAXCALLS 
+ * outstanding calls */
+
+#define rx_SetMaxCalls(v) \
+do {\
+       rx_SetCloneMax(v/4); \
+} while(0);
+
+#define rx_SetCloneMax(v) \
+do {\
+       if(v < RX_HARD_MAX_CLONES) \
+               rx_max_clones_per_connection = v; \
+} while(0);
+
 #define rx_PutConnection(conn) rx_DestroyConnection(conn)
 
 /* A connection is an authenticated communication path, allowing 
@@ -222,7 +236,9 @@ struct rx_connection_rx_lock {
     struct rx_peer_rx_lock *peer;
 #else
 struct rx_connection {
-    struct rx_connection *next;        /*  on hash chain _or_ free list */
+    struct rx_connection *next;        /* on hash chain _or_ free list */
+    struct rx_connection *parent; /* primary connection, if this is a clone */
+    struct rx_connection *next_clone; /* next in list of clones */
     struct rx_peer *peer;
 #endif
 #ifdef RX_ENABLE_LOCKS
@@ -230,6 +246,7 @@ struct rx_connection {
     afs_kcondvar_t conn_call_cv;
     afs_kmutex_t conn_data_lock;       /* locks packet data */
 #endif
+    afs_uint32 nclones; /* count of clone connections (if not a clone) */
     afs_uint32 epoch;          /* Process start time of client side of connection */
     afs_uint32 cid;            /* Connection id (call channel is bottom bits) */
     afs_int32 error;           /* If this connection is in error, this is it */
@@ -429,6 +446,7 @@ struct rx_peer {
 #define RX_CONN_RESET             16   /* connection is reset, remove */
 #define RX_CONN_BUSY               32  /* connection is busy; don't delete */
 #define RX_CONN_ATTACHWAIT        64   /* attach waiting for peer->lastReach */
+#define RX_CLONED_CONNECTION     128   /* connection is a clone */
 
 /* Type of connection, client or server */
 #define        RX_CLIENT_CONNECTION    0
index 749a380..fc4272f 100644 (file)
@@ -591,4 +591,13 @@ EXT2 int rx_enable_stats GLOBALSINIT(0);
  */
 EXT int rx_enable_hot_thread GLOBALSINIT(0);
 
+/*
+ * Set rx_max_clones_per_connection to a value > 0 to enable connection clone 
+ * workaround to RX_MAXCALLS limit.
+ */
+#define RX_HARD_MAX_CLONES 10
+
+EXT int rx_max_clones_per_connection GLOBALSINIT(2);
+
 #endif /* AFS_RX_GLOBALS_H */