rx-finer-grained-locking-20090110
authorJeffrey Altman <jaltman@your-file-system.com>
Sun, 11 Jan 2009 05:27:01 +0000 (05:27 +0000)
committerJeffrey Altman <jaltman@secure-endpoints.com>
Sun, 11 Jan 2009 05:27:01 +0000 (05:27 +0000)
LICENSE MIT

not everything should be under the rx_stats_mutex.  doing so
results in too much lock contention.  add new mutexes:
rx_quota_mutex, rx_waiting_mutex, rx_pthread_mutex, and rx_packets_mutex.
Each new mutex protects an associated group of variables.

src/rx/rx.c
src/rx/rx_globals.h
src/rx/rx_kcommon.c
src/rx/rx_packet.c
src/rx/rx_pthread.c

index 6802c38..49184f4 100644 (file)
@@ -165,6 +165,10 @@ static unsigned int rxi_rpc_process_stat_cnt;
  */
 
 extern pthread_mutex_t rx_stats_mutex;
+extern pthread_mutex_t rx_waiting_mutex;
+extern pthread_mutex_t rx_quota_mutex;
+extern pthread_mutex_t rx_pthread_mutex;
+extern pthread_mutex_t rx_packets_mutex;
 extern pthread_mutex_t des_init_mutex;
 extern pthread_mutex_t des_random_mutex;
 extern pthread_mutex_t rx_clock_mutex;
@@ -193,6 +197,14 @@ rxi_InitPthread(void)
           == 0);
     assert(pthread_mutex_init(&rx_stats_mutex, (const pthread_mutexattr_t *)0)
           == 0);
+    assert(pthread_mutex_init(&rx_waiting_mutex, (const pthread_mutexattr_t *)0)
+          == 0);
+    assert(pthread_mutex_init(&rx_quota_mutex, (const pthread_mutexattr_t *)0)
+          == 0);
+    assert(pthread_mutex_init(&rx_pthread_mutex, (const pthread_mutexattr_t *)0)
+          == 0);
+    assert(pthread_mutex_init(&rx_packets_mutex, (const pthread_mutexattr_t *)0)
+          == 0);
     assert(pthread_mutex_init
           (&rxi_connCacheMutex, (const pthread_mutexattr_t *)0) == 0);
     assert(pthread_mutex_init(&rx_init_mutex, (const pthread_mutexattr_t *)0)
@@ -256,19 +268,40 @@ pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
 /*
  * The rx_stats_mutex mutex protects the following global variables:
- * rxi_dataQuota
- * rxi_minDeficit
- * rxi_availProcs
- * rxi_totalMin
  * rxi_lowConnRefCount
  * rxi_lowPeerRefCount
  * rxi_nCalls
  * rxi_Alloccnt
  * rxi_Allocsize
- * rx_nFreePackets
  * rx_tq_debug
  * rx_stats
  */
+
+/*
+ * The rx_quota_mutex mutex protects the following global variables:
+ * rxi_dataQuota
+ * rxi_minDeficit
+ * rxi_availProcs
+ * rxi_totalMin
+ */
+
+/* 
+ * The rx_freePktQ_lock protects the following global variables:
+ * rx_nFreePackets 
+ */
+
+/*
+ * The rx_packets_mutex mutex protects the following global variables:
+ * rx_nPackets
+ * rx_TSFPQLocalMax
+ * rx_TSFPQGlobSize
+ * rx_TSFPQMaxProcs
+ */
+
+/*
+ * The rx_pthread_mutex mutex protects the following global variables:
+ * rxi_pthread_hinum
+ */
 #else
 #define INIT_PTHREAD_LOCKS
 #endif
@@ -457,6 +490,10 @@ rx_InitHost(u_int host, u_int port)
     rxdb_init();
 #endif /* RX_LOCKS_DB */
     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
+    MUTEX_INIT(&rx_waiting_mutex, "rx_waiting_mutex", MUTEX_DEFAULT, 0);
+    MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0);
+    MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0);
+    MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0);
@@ -538,7 +575,7 @@ rx_InitHost(u_int host, u_int port)
     rx_SetEpoch(tv.tv_sec);    /* Start time of this package, rxkad
                                 * will provide a randomer value. */
 #endif
-    rx_MutexAdd(rxi_dataQuota, rx_extraQuota, rx_stats_mutex); /* + extra pkts caller asked to rsrv */
+    rx_MutexAdd(rxi_dataQuota, rx_extraQuota, rx_stats_quota); /* + extra pkts caller asked to rsrv */
     /* *Slightly* random start time for the cid.  This is just to help
      * out with the hashing function at the peer */
     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
@@ -600,7 +637,8 @@ QuotaOK(register struct rx_service *aservice)
     /* otherwise, can use only if there are enough to allow everyone
      * to go to their min quota after this guy starts.
      */
-    MUTEX_ENTER(&rx_stats_mutex);
+
+    MUTEX_ENTER(&rx_quota_mutex);
     if ((aservice->nRequestsRunning < aservice->minProcs)
        || (rxi_availProcs > rxi_minDeficit)) {
        aservice->nRequestsRunning++;
@@ -609,10 +647,10 @@ QuotaOK(register struct rx_service *aservice)
        if (aservice->nRequestsRunning <= aservice->minProcs)
            rxi_minDeficit--;
        rxi_availProcs--;
-       MUTEX_EXIT(&rx_stats_mutex);
+       MUTEX_EXIT(&rx_quota_mutex);
        return 1;
     }
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_quota_mutex);
 
     return 0;
 }
@@ -621,11 +659,11 @@ static void
 ReturnToServerPool(register struct rx_service *aservice)
 {
     aservice->nRequestsRunning--;
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_quota_mutex);
     if (aservice->nRequestsRunning < aservice->minProcs)
        rxi_minDeficit++;
     rxi_availProcs++;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_quota_mutex);
 }
 
 #else /* RX_ENABLE_LOCKS */
@@ -726,13 +764,13 @@ rx_StartServer(int donateMe)
        service = rx_services[i];
        if (service == (struct rx_service *)0)
            break;
-       MUTEX_ENTER(&rx_stats_mutex);
+       MUTEX_ENTER(&rx_quota_mutex);
        rxi_totalMin += service->minProcs;
        /* below works even if a thread is running, since minDeficit would
         * still have been decremented and later re-incremented.
         */
        rxi_minDeficit += service->minProcs;
-       MUTEX_EXIT(&rx_stats_mutex);
+       MUTEX_EXIT(&rx_quota_mutex);
     }
 
     /* Turn on reaping of idle server connections */
@@ -1689,7 +1727,7 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
 
            if (call->flags & RX_CALL_WAIT_PROC) {
                call->flags &= ~RX_CALL_WAIT_PROC;
-               rx_MutexDecrement(rx_nWaiting, rx_stats_mutex);
+               rx_MutexDecrement(rx_nWaiting, rx_waiting_mutex);
            }
 
            if (call->state != RX_STATE_PRECALL || call->error) {
@@ -3083,7 +3121,8 @@ static int
 TooLow(struct rx_packet *ap, struct rx_call *acall)
 {
     int rc = 0;
-    MUTEX_ENTER(&rx_stats_mutex);
+
+    MUTEX_ENTER(&rx_quota_mutex);
     if (((ap->header.seq != 1) && (acall->flags & RX_CALL_CLEARED)
         && (acall->state == RX_STATE_PRECALL))
        || ((rx_nFreePackets < rxi_dataQuota + 2)
@@ -3091,7 +3130,7 @@ TooLow(struct rx_packet *ap, struct rx_call *acall)
                 && (acall->flags & RX_CALL_READER_WAIT)))) {
        rc = 1;
     }
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_quota_mutex);
     return rc;
 }
 #endif /* KERNEL */
@@ -4219,10 +4258,10 @@ rxi_AttachServerProc(register struct rx_call *call,
 
        if (!(call->flags & RX_CALL_WAIT_PROC)) {
            call->flags |= RX_CALL_WAIT_PROC;
-           MUTEX_ENTER(&rx_stats_mutex);
-           rx_nWaiting++;
-           rx_nWaited++;
-           MUTEX_EXIT(&rx_stats_mutex);
+            MUTEX_ENTER(&rx_waiting_mutex);
+            rx_nWaiting++;
+            rx_nWaited++;
+            MUTEX_EXIT(&rx_waiting_mutex);
            rxi_calltrace(RX_CALL_ARRIVAL, call);
            SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
            queue_Append(&rx_incomingCallQueue, call);
@@ -4249,9 +4288,10 @@ rxi_AttachServerProc(register struct rx_call *call,
            call->flags &= ~RX_CALL_WAIT_PROC;
            if (queue_IsOnQueue(call)) {
                queue_Remove(call);
-               MUTEX_ENTER(&rx_stats_mutex);
-               rx_nWaiting--;
-               MUTEX_EXIT(&rx_stats_mutex);
+                
+                MUTEX_ENTER(&rx_waiting_mutex);
+                rx_nWaiting--;
+                MUTEX_EXIT(&rx_waiting_mutex);
            }
        }
        call->state = RX_STATE_ACTIVE;
@@ -4744,9 +4784,10 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
        if (queue_IsOnQueue(call)) {
            queue_Remove(call);
            if (flags & RX_CALL_WAIT_PROC) {
-               MUTEX_ENTER(&rx_stats_mutex);
-               rx_nWaiting--;
-               MUTEX_EXIT(&rx_stats_mutex);
+                
+                MUTEX_ENTER(&rx_waiting_mutex);
+                rx_nWaiting--;
+                MUTEX_EXIT(&rx_waiting_mutex);
            }
        }
        MUTEX_EXIT(call->call_queue_lock);
@@ -7007,11 +7048,10 @@ shutdown_rx(void)
 
     rxi_FreeAllPackets();
 
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_quota_mutex);
     rxi_dataQuota = RX_MAX_QUOTA;
     rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
-    MUTEX_EXIT(&rx_stats_mutex);
-
+    MUTEX_EXIT(&rx_quota_mutex);
     rxinit_status = 1;
     UNLOCK_RX_INIT;
 }
index 87ef987..a25f70d 100644 (file)
@@ -267,6 +267,7 @@ void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to glob
  * by each call to AllocPacketBufs() will increase indefinitely without a cap on the transfer
  * glob size.  A cap of 64 is selected because that will produce an allocation of greater than
  * three times that amount which is greater than half of ncalls * maxReceiveWindow. 
+ * Must be called under rx_packets_mutex.
  */
 #define RX_TS_FPQ_COMPUTE_LIMITS \
     do { \
@@ -308,9 +309,9 @@ void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to glob
         (rx_ts_info_p)->_FPQ.ltog_ops++; \
         (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \
         if ((rx_ts_info_p)->_FPQ.delta) { \
-            MUTEX_ENTER(&rx_stats_mutex); \
+            MUTEX_ENTER(&rx_packets_mutex); \
             RX_TS_FPQ_COMPUTE_LIMITS; \
-            MUTEX_EXIT(&rx_stats_mutex); \
+            MUTEX_EXIT(&rx_packets_mutex); \
            (rx_ts_info_p)->_FPQ.delta = 0; \
         } \
     } while(0)
@@ -328,9 +329,9 @@ void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to glob
         (rx_ts_info_p)->_FPQ.ltog_ops++; \
         (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \
         if ((rx_ts_info_p)->_FPQ.delta) { \
-            MUTEX_ENTER(&rx_stats_mutex); \
+            MUTEX_ENTER(&rx_packets_mutex); \
             RX_TS_FPQ_COMPUTE_LIMITS; \
-            MUTEX_EXIT(&rx_stats_mutex); \
+            MUTEX_EXIT(&rx_packets_mutex); \
             (rx_ts_info_p)->_FPQ.delta = 0; \
         } \
     } while(0)
@@ -601,14 +602,18 @@ EXT int rxi_callAbortDelay GLOBALSINIT(3000);
 EXT int rxi_fcfs_thread_num GLOBALSINIT(0);
 EXT pthread_key_t rx_thread_id_key;
 /* keep track of pthread numbers - protected by rx_stats_mutex, 
-   except in rx_Init() before mutex exists! */
+ * except in rx_Init() before mutex exists! */
 EXT int rxi_pthread_hinum GLOBALSINIT(0);
 #else
 #define rxi_fcfs_thread_num (0)
 #endif
 
 #if defined(RX_ENABLE_LOCKS)
-EXT afs_kmutex_t rx_stats_mutex;       /* used to activate stats gathering */
+EXT afs_kmutex_t rx_stats_mutex;       /* used to protect stats gathering */
+EXT afs_kmutex_t rx_waiting_mutex;     /* used to protect waiting counters */
+EXT afs_kmutex_t rx_quota_mutex;       /* used to protect quota counters */
+EXT afs_kmutex_t rx_pthread_mutex;     /* used to protect pthread counters */
+EXT afs_kmutex_t rx_packets_mutex;     /* used to protect packet counters */
 #endif
 
 EXT2 int rx_enable_stats GLOBALSINIT(0);
index 72c8dba..ea5d284 100644 (file)
@@ -289,6 +289,7 @@ rx_ServerProc(void *unused)
 {
     int threadID;
 
+/* jaltman - rxi_dataQuota is protected by a mutex everywhere else */
     rxi_MorePackets(rx_maxReceiveWindow + 2);  /* alloc more packets */
     rxi_dataQuota += rx_initSendWindow;        /* Reserve some pkts for hard times */
     /* threadID is used for making decisions in GetCall.  Get it by bumping
index d68ce7c..7f0be72 100644 (file)
@@ -549,10 +549,11 @@ rxi_MorePackets(int apackets)
 
     RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
     /* TSFPQ patch also needs to keep track of total packets */
-    MUTEX_ENTER(&rx_stats_mutex);
+
+    MUTEX_ENTER(&rx_packets_mutex);
     rx_nPackets += apackets;
     RX_TS_FPQ_COMPUTE_LIMITS;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_packets_mutex);
 
     for (e = p + apackets; p < e; p++) {
         RX_PACKET_IOV_INIT(p);
@@ -641,10 +642,10 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local)
 
     RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
     /* TSFPQ patch also needs to keep track of total packets */
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_packets_mutex);
     rx_nPackets += apackets;
     RX_TS_FPQ_COMPUTE_LIMITS;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_packets_mutex);
 
     for (e = p + apackets; p < e; p++) {
         RX_PACKET_IOV_INIT(p);
@@ -724,10 +725,10 @@ rxi_MorePacketsNoLock(int apackets)
     rx_nFreePackets += apackets;
 #ifdef RX_ENABLE_TSFPQ
     /* TSFPQ patch also needs to keep track of total packets */
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_packets_mutex);
     rx_nPackets += apackets;
     RX_TS_FPQ_COMPUTE_LIMITS;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_packets_mutex);
 #endif /* RX_ENABLE_TSFPQ */
     rxi_NeedMorePackets = FALSE;
     rxi_PacketsUnWait();
index cc27ae4..028365e 100644 (file)
@@ -288,7 +288,7 @@ rx_ServerProc(void * dummy)
     struct rx_call *newcall = NULL;
 
     rxi_MorePackets(rx_maxReceiveWindow + 2);  /* alloc more packets */
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_quota_mutex);
     rxi_dataQuota += rx_initSendWindow;        /* Reserve some pkts for hard times */
     /* threadID is used for making decisions in GetCall.  Get it by bumping
      * number of threads handling incoming calls */
@@ -303,11 +303,13 @@ rx_ServerProc(void * dummy)
      * So either introduce yet another counter or flag the FCFS
      * thread... chose the latter.
      */
+    MUTEX_ENTER(&rx_pthread_mutex);
     threadID = ++rxi_pthread_hinum;
+    MUTEX_EXIT(&rx_pthread_mutex);
     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
        rxi_fcfs_thread_num = threadID;
     ++rxi_availProcs;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_quota_mutex);
 
     while (1) {
        sock = OSI_NULLSOCKET;
@@ -358,9 +360,9 @@ rxi_StartListener(void)
        dpf(("Unable to create Rx event handling thread\n"));
        exit(1);
     }
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_pthread_mutex);
     ++rxi_pthread_hinum;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_pthread_mutex);
     AFS_SIGSET_RESTORE();
 
     assert(pthread_mutex_lock(&listener_mutex) == 0);
@@ -397,9 +399,9 @@ rxi_Listen(osi_socket sock)
        dpf(("Unable to create socket listener thread\n"));
        exit(1);
     }
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_pthread_mutex);
     ++rxi_pthread_hinum;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_pthread_mutex);
     AFS_SIGSET_RESTORE();
     return 0;
 }
@@ -452,10 +454,10 @@ struct rx_ts_info_t * rx_ts_info_init() {
 #ifdef RX_ENABLE_TSFPQ
     queue_Init(&rx_ts_info->_FPQ);
 
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_packets_mutex);
     rx_TSFPQMaxProcs++;
     RX_TS_FPQ_COMPUTE_LIMITS;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_packets_mutex);
 #endif /* RX_ENABLE_TSFPQ */
     return rx_ts_info;
 }