rx_fpq_take_three-20050420
authorTom Keiser <tkeiser@psu.edu>
Wed, 20 Apr 2005 22:21:16 +0000 (22:21 +0000)
committerJeffrey Altman <jaltman@secure-endpoints.com>
Wed, 20 Apr 2005 22:21:16 +0000 (22:21 +0000)
FIXES 17805

I've been stress testing a patch all weekend that changes the way
thread-local packet quotas are computed.  I was able to replicate the
unbounded packet alloc problem on unix server components by eliminating my
code from rxi_StartServerProcs that sets the maximum number of expected
threads.  This patch makes the upper thread limit get computed on the fly,
adds some thread-local free packet queue statistics, and a few other minor
tweaks.  I still don't have a working windows development environment, so
I can't say whether this will fix all the windows client problems.  But,
the unbounded packet allocation problem should go away with this patch.

stress testing on windows succeeds as well. <jaltman@secure-endpoints.com>

src/rx/rx.c
src/rx/rx_globals.h
src/rx/rx_packet.c
src/rx/rx_pthread.c

index 5f129ac..e2c474d 100644 (file)
@@ -653,10 +653,6 @@ rxi_StartServerProcs(int nExistingProcs)
     }
     nProcs += maxdiff;         /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
     nProcs -= nExistingProcs;  /* Subtract the number of procs that were previously created for use as server procs */
-#ifdef RX_ENABLE_TSFPQ
-    rx_TSFPQMaxProcs += nProcs;
-    RX_TS_FPQ_COMPUTE_LIMITS;
-#endif /* RX_ENABLE_TSFPQ */
     for (i = 0; i < nProcs; i++) {
        rxi_StartServerProc(rx_ServerProc, rx_stackSize);
     }
@@ -672,10 +668,6 @@ rx_StartClientThread(void)
     int pid;
     pid = (int) pthread_self();
 #endif /* AFS_PTHREAD_ENV */
-#ifdef RX_ENABLE_TSFPQ
-    rx_TSFPQMaxProcs++;
-    RX_TS_FPQ_COMPUTE_LIMITS;
-#endif /* RX_ENABLE_TSFPQ */
 }
 #endif /* AFS_NT40_ENV */
 
@@ -739,6 +731,12 @@ rx_StartServer(int donateMe)
 #endif /* AFS_NT40_ENV */
        rx_ServerProc();        /* Never returns */
     }
+#ifdef RX_ENABLE_TSFPQ
+    /* no use leaving packets around in this thread's local queue if
+     * it isn't getting donated to the server thread pool. 
+     */
+    rxi_FlushLocalPacketsTSFPQ();
+#endif /* RX_ENABLE_TSFPQ */
     return;
 }
 
index 494738f..e3040bc 100644 (file)
@@ -159,6 +159,16 @@ typedef struct rx_ts_info_t {
         struct rx_queue queue;
         int len;                /* local queue length */
         int delta;              /* number of new packets alloc'd locally since last sync w/ global queue */
+        
+        /* FPQ stats */
+        int checkin_ops;
+        int checkout_ops;
+        int gtol_ops;
+        int gtol_xfer;
+        int ltog_ops;
+        int ltog_xfer;
+        int alloc_ops;
+        int alloc_xfer;
     } _FPQ;
 } rx_ts_info_t;
 EXT struct rx_ts_info_t * rx_ts_info_init();   /* init function for thread-specific data struct */
@@ -210,14 +220,17 @@ EXT struct rx_queue rx_freePacketQueue;
 EXT afs_kmutex_t rx_freePktQ_lock;
 #endif /* RX_ENABLE_LOCKS */
 
-#if defined(AFS_PTHREAD_ENV) && !defined(AFS_NT40_ENV)
+#if defined(AFS_PTHREAD_ENV)
 #define RX_ENABLE_TSFPQ
 EXT int rx_TSFPQGlobSize INIT(3); /* number of packets to transfer between global and local queues in one op */
 EXT int rx_TSFPQLocalMax INIT(15); /* max number of packets on local FPQ before returning a glob to the global pool */
-EXT int rx_TSFPQMaxProcs INIT(1); /* max number of threads expected */
+EXT int rx_TSFPQMaxProcs INIT(0); /* max number of threads expected */
 EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local); /* more flexible packet alloc function */
+EXT void rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit); /* adjust thread-local queue length, for places where we know how many packets we will need a priori */
+EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to global queue */
 #define RX_TS_FPQ_FLUSH_GLOBAL 1
 #define RX_TS_FPQ_PULL_GLOBAL 1
+#define RX_TS_FPQ_ALLOW_OVERCOMMIT 1
 /* compute the localmax and globsize values from rx_TSFPQMaxProcs and rx_nPackets.
    arbitarily set local max so that all threads consume 90% of packets, if all local queues are full.
    arbitarily set transfer glob size to 20% of max local packet queue length.
@@ -247,10 +260,16 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         } \
         (rx_ts_info_p)->_FPQ.len -= tsize; \
         rx_nFreePackets += tsize; \
+        (rx_ts_info_p)->_FPQ.ltog_ops++; \
+        (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \
         if ((rx_ts_info_p)->_FPQ.delta) { \
+            (rx_ts_info_p)->_FPQ.alloc_ops++; \
+            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
+            MUTEX_ENTER(&rx_stats_mutex); \
             rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
-            (rx_ts_info_p)->_FPQ.delta = 0; \
             RX_TS_FPQ_COMPUTE_LIMITS; \
+            MUTEX_EXIT(&rx_stats_mutex); \
+           (rx_ts_info_p)->_FPQ.delta = 0; \
         } \
     } while(0)
 /* same as above, except user has direct control over number to transfer */
@@ -265,10 +284,16 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         } \
         (rx_ts_info_p)->_FPQ.len -= (num_transfer); \
         rx_nFreePackets += (num_transfer); \
+        (rx_ts_info_p)->_FPQ.ltog_ops++; \
+        (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \
         if ((rx_ts_info_p)->_FPQ.delta) { \
+            (rx_ts_info_p)->_FPQ.alloc_ops++; \
+            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
+            MUTEX_ENTER(&rx_stats_mutex); \
             rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
-            (rx_ts_info_p)->_FPQ.delta = 0; \
             RX_TS_FPQ_COMPUTE_LIMITS; \
+            MUTEX_EXIT(&rx_stats_mutex); \
+            (rx_ts_info_p)->_FPQ.delta = 0; \
         } \
     } while(0)
 /* move packets from global to local (thread-specific) free packet queue.
@@ -284,6 +309,23 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         } \
         (rx_ts_info_p)->_FPQ.len += i; \
         rx_nFreePackets -= i; \
+        (rx_ts_info_p)->_FPQ.gtol_ops++; \
+        (rx_ts_info_p)->_FPQ.gtol_xfer += i; \
+    } while(0)
+/* same as above, except user has direct control over number to transfer */
+#define RX_TS_FPQ_GTOL2(rx_ts_info_p,num_transfer) \
+    do { \
+        register int i; \
+        register struct rx_packet * p; \
+        for (i=0; i < (num_transfer); i++) { \
+            p = queue_First(&rx_freePacketQueue, rx_packet); \
+            queue_Remove(p); \
+            queue_Append(&((rx_ts_info_p)->_FPQ),p); \
+        } \
+        (rx_ts_info_p)->_FPQ.len += i; \
+        rx_nFreePackets -= i; \
+        (rx_ts_info_p)->_FPQ.gtol_ops++; \
+        (rx_ts_info_p)->_FPQ.gtol_xfer += i; \
     } while(0)
 /* checkout a packet from the thread-specific free packet queue */
 #define RX_TS_FPQ_CHECKOUT(rx_ts_info_p,p) \
@@ -292,6 +334,7 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         queue_Remove(p); \
         RX_FPQ_MARK_USED(p); \
         (rx_ts_info_p)->_FPQ.len--; \
+        (rx_ts_info_p)->_FPQ.checkout_ops++; \
     } while(0)
 /* check a packet into the thread-specific free packet queue */
 #define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \
@@ -299,8 +342,9 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         queue_Prepend(&((rx_ts_info_p)->_FPQ), (p)); \
         RX_FPQ_MARK_FREE(p); \
         (rx_ts_info_p)->_FPQ.len++; \
+        (rx_ts_info_p)->_FPQ.checkin_ops++; \
     } while(0)
-#endif /* AFS_PTHREAD_ENV && !AFS_NT40_ENV */
+#endif /* AFS_PTHREAD_ENV */
 
 /* Number of free packets */
 EXT int rx_nFreePackets INIT(0);
index d7dc993..482f54b 100644 (file)
@@ -568,11 +568,14 @@ rxi_MorePacketsNoLock(int apackets)
 
        queue_Append(&rx_freePacketQueue, p);
     }
+
     rx_nFreePackets += apackets;
 #ifdef RX_ENABLE_TSFPQ
     /* TSFPQ patch also needs to keep track of total packets */
+    MUTEX_ENTER(&rx_stats_mutex);
     rx_nPackets += apackets;
     RX_TS_FPQ_COMPUTE_LIMITS;
+    MUTEX_EXIT(&rx_stats_mutex);
 #endif /* RX_ENABLE_TSFPQ */
     rxi_NeedMorePackets = FALSE;
     rxi_PacketsUnWait();
@@ -589,6 +592,44 @@ rxi_FreeAllPackets(void)
     UNPIN(rx_mallocedP, (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet));
 }
 
+#ifdef RX_ENABLE_TSFPQ
+void
+rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit)
+{
+    register struct rx_ts_info_t * rx_ts_info;
+    register int xfer;
+    SPLVAR;
+
+    RX_TS_INFO_GET(rx_ts_info);
+
+    if (num_keep_local != rx_ts_info->_FPQ.len) {
+        NETPRI;
+       MUTEX_ENTER(&rx_freePktQ_lock);
+        if (num_keep_local < rx_ts_info->_FPQ.len) {
+            xfer = rx_ts_info->_FPQ.len - num_keep_local;
+           RX_TS_FPQ_LTOG2(rx_ts_info, xfer);
+           rxi_PacketsUnWait();
+        } else {
+            xfer = num_keep_local - rx_ts_info->_FPQ.len;
+            if ((num_keep_local > rx_TSFPQLocalMax) && !allow_overcommit)
+                xfer = rx_TSFPQLocalMax - rx_ts_info->_FPQ.len;
+            if (rx_nFreePackets < xfer) {
+                rxi_MorePacketsNoLock(xfer - rx_nFreePackets);
+            }
+            RX_TS_FPQ_GTOL2(rx_ts_info, xfer);
+        }
+       MUTEX_EXIT(&rx_freePktQ_lock);
+       USERPRI;
+    }
+}
+
+void
+rxi_FlushLocalPacketsTSFPQ(void)
+{
+    rxi_AdjustLocalPacketsTSFPQ(0, 0);
+}
+#endif /* RX_ENABLE_TSFPQ */
+
 /* Allocate more packets iff we need more continuation buffers */
 /* In kernel, can't page in memory with interrupts disabled, so we
  * don't use the event mechanism. */
index a318261..c0e4329 100644 (file)
@@ -426,6 +426,13 @@ struct rx_ts_info_t * rx_ts_info_init() {
     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
+#ifdef RX_ENABLE_TSFPQ
     queue_Init(&rx_ts_info->_FPQ);
+
+    MUTEX_ENTER(&rx_stats_mutex);
+    rx_TSFPQMaxProcs++;
+    RX_TS_FPQ_COMPUTE_LIMITS;
+    MUTEX_EXIT(&rx_stats_mutex);
+#endif /* RX_ENABLE_TSFPQ */
     return rx_ts_info;
 }