rx-change-packet-allocation-calculation-20080925
authorJeffrey Altman <jaltman@secure-endpoints.com>
Thu, 25 Sep 2008 20:34:41 +0000 (20:34 +0000)
committerDerrick Brashear <shadow@dementia.org>
Thu, 25 Sep 2008 20:34:41 +0000 (20:34 +0000)
LICENSE IPL10

don't alloc ourselves to death; try harder to manage growth

src/rx/rx_globals.h
src/rx/rx_packet.c
src/rx/rx_packet.h

index 101c172..f931db3 100644 (file)
@@ -177,8 +177,10 @@ typedef struct rx_ts_info_t {
         int gtol_xfer;
         int ltog_ops;
         int ltog_xfer;
-        int alloc_ops;
-        int alloc_xfer;
+        int lalloc_ops;
+        int lalloc_xfer;
+        int galloc_ops;
+        int galloc_xfer;
     } _FPQ;
     struct rx_packet * local_special_packet;
 } rx_ts_info_t;
@@ -265,14 +267,28 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to
         rx_TSFPQLocalMax = newmax; \
         rx_TSFPQGlobSize = newglob; \
     } while(0)
+/* record the number of packets allocated by this thread 
+ * and stored in the thread local queue */
+#define RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info_p,num_alloc) \
+    do { \
+        (rx_ts_info_p)->_FPQ.lalloc_ops++; \
+        (rx_ts_info_p)->_FPQ.lalloc_xfer += num_alloc; \
+    } while (0)
+/* record the number of packets allocated by this thread 
+ * and stored in the global queue */
+#define RX_TS_FPQ_GLOBAL_ALLOC(rx_ts_info_p,num_alloc) \
+    do { \
+        (rx_ts_info_p)->_FPQ.galloc_ops++; \
+        (rx_ts_info_p)->_FPQ.galloc_xfer += num_alloc; \
+    } while (0)
 /* move packets from local (thread-specific) to global free packet queue.
-   rx_freePktQ_lock must be held. default is to move the difference between the current lenght, and the 
-   allowed max plus one extra glob. */
+   rx_freePktQ_lock must be held. default is to reduce the queue size to 40% ofmax */
 #define RX_TS_FPQ_LTOG(rx_ts_info_p) \
     do { \
         register int i; \
         register struct rx_packet * p; \
-        register int tsize = (rx_ts_info_p)->_FPQ.len - rx_TSFPQLocalMax + rx_TSFPQGlobSize; \
+        register int tsize = (rx_ts_info_p)->_FPQ.len - rx_TSFPQLocalMax + 3 *  rx_TSFPQGlobSize; \
+       if (tsize <= 0) break; \
         for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
              i < tsize; i++,p=queue_Prev(p, rx_packet)); \
         queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \
@@ -281,10 +297,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to
         (rx_ts_info_p)->_FPQ.ltog_ops++; \
         (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \
         if ((rx_ts_info_p)->_FPQ.delta) { \
-            (rx_ts_info_p)->_FPQ.alloc_ops++; \
-            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
             MUTEX_ENTER(&rx_stats_mutex); \
-            rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
             RX_TS_FPQ_COMPUTE_LIMITS; \
             MUTEX_EXIT(&rx_stats_mutex); \
            (rx_ts_info_p)->_FPQ.delta = 0; \
@@ -295,6 +308,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to
     do { \
         register int i; \
         register struct rx_packet * p; \
+        if (num_transfer <= 0) break; \
         for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
             i < (num_transfer); i++,p=queue_Prev(p, rx_packet)); \
         queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \
@@ -303,10 +317,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to
         (rx_ts_info_p)->_FPQ.ltog_ops++; \
         (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \
         if ((rx_ts_info_p)->_FPQ.delta) { \
-            (rx_ts_info_p)->_FPQ.alloc_ops++; \
-            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
             MUTEX_ENTER(&rx_stats_mutex); \
-            rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
             RX_TS_FPQ_COMPUTE_LIMITS; \
             MUTEX_EXIT(&rx_stats_mutex); \
             (rx_ts_info_p)->_FPQ.delta = 0; \
index 4175a1c..686a52e 100644 (file)
@@ -267,7 +267,7 @@ static int
 AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
 {
     register struct rx_ts_info_t * rx_ts_info;
-    int transfer, alloc;
+    int transfer;
     SPLVAR;
 
     RX_TS_INFO_GET(rx_ts_info);
@@ -276,16 +276,10 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
     if (transfer > 0) {
         NETPRI;
         MUTEX_ENTER(&rx_freePktQ_lock);
-       
-       if ((transfer + rx_TSFPQGlobSize) <= rx_nFreePackets) {
-           transfer += rx_TSFPQGlobSize;
-       } else if (transfer <= rx_nFreePackets) {
-           transfer = rx_nFreePackets;
-       } else {
+       transfer = MAX(transfer, rx_TSFPQGlobSize);
+       if (transfer > rx_nFreePackets) {
            /* alloc enough for us, plus a few globs for other threads */
-           alloc = transfer + (3 * rx_TSFPQGlobSize) - rx_nFreePackets;
-           rxi_MorePacketsNoLock(MAX(alloc, rx_initSendWindow));
-           transfer = rx_TSFPQGlobSize;
+           rxi_MorePacketsNoLock(transfer + 4 * rx_initSendWindow);
        }
 
        RX_TS_FPQ_GTOL2(rx_ts_info, transfer);
@@ -347,7 +341,7 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
     }
 #else /* KERNEL */
     if (rx_nFreePackets < num_pkts) {
-        rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), rx_initSendWindow));
+       rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), 4 * rx_initSendWindow));
     }
 #endif /* KERNEL */
 
@@ -546,18 +540,31 @@ rxi_MorePackets(int apackets)
     SPLVAR;
 
     getme = apackets * sizeof(struct rx_packet);
-    p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+    p = (struct rx_packet *)osi_Alloc(getme);
     osi_Assert(p);
 
     PIN(p, getme);             /* XXXXX */
     memset((char *)p, 0, getme);
     RX_TS_INFO_GET(rx_ts_info);
 
+    RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
+    /* TSFPQ patch also needs to keep track of total packets */
+    MUTEX_ENTER(&rx_stats_mutex);
+    rx_nPackets += apackets;
+    RX_TS_FPQ_COMPUTE_LIMITS;
+    MUTEX_EXIT(&rx_stats_mutex);
+
     for (e = p + apackets; p < e; p++) {
         RX_PACKET_IOV_INIT(p);
        p->niovecs = 2;
 
        RX_TS_FPQ_CHECKIN(rx_ts_info,p);
+
+        NETPRI;
+        MUTEX_ENTER(&rx_freePktQ_lock);
+        rx_mallocedP = p;
+        MUTEX_EXIT(&rx_freePktQ_lock);
+        USERPRI;
     }
     rx_ts_info->_FPQ.delta += apackets;
 
@@ -582,7 +589,7 @@ rxi_MorePackets(int apackets)
     SPLVAR;
 
     getme = apackets * sizeof(struct rx_packet);
-    p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+    p = (struct rx_packet *)osi_Alloc(getme);
     osi_Assert(p);
 
     PIN(p, getme);             /* XXXXX */
@@ -596,7 +603,9 @@ rxi_MorePackets(int apackets)
        p->niovecs = 2;
 
        queue_Append(&rx_freePacketQueue, p);
+       rx_mallocedP = p;
     }
+
     rx_nFreePackets += apackets;
     rxi_NeedMorePackets = FALSE;
     rxi_PacketsUnWait();
@@ -616,17 +625,29 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local)
     SPLVAR;
 
     getme = apackets * sizeof(struct rx_packet);
-    p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+    p = (struct rx_packet *)osi_Alloc(getme);
 
     PIN(p, getme);             /* XXXXX */
     memset((char *)p, 0, getme);
     RX_TS_INFO_GET(rx_ts_info);
 
+    RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
+    /* TSFPQ patch also needs to keep track of total packets */
+    MUTEX_ENTER(&rx_stats_mutex);
+    rx_nPackets += apackets;
+    RX_TS_FPQ_COMPUTE_LIMITS;
+    MUTEX_EXIT(&rx_stats_mutex);
+
     for (e = p + apackets; p < e; p++) {
         RX_PACKET_IOV_INIT(p);
        p->niovecs = 2;
-
        RX_TS_FPQ_CHECKIN(rx_ts_info,p);
+       
+        NETPRI;
+        MUTEX_ENTER(&rx_freePktQ_lock);
+        rx_mallocedP = p;
+        MUTEX_EXIT(&rx_freePktQ_lock);
+        USERPRI;
     }
     rx_ts_info->_FPQ.delta += apackets;
 
@@ -650,6 +671,9 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local)
 void
 rxi_MorePacketsNoLock(int apackets)
 {
+#ifdef RX_ENABLE_TSFPQ
+    register struct rx_ts_info_t * rx_ts_info;
+#endif /* RX_ENABLE_TSFPQ */
     struct rx_packet *p, *e;
     int getme;
 
@@ -659,7 +683,7 @@ rxi_MorePacketsNoLock(int apackets)
        * ((rx_maxJumboRecvSize - RX_FIRSTBUFFERSIZE) / RX_CBUFFERSIZE);
     do {
         getme = apackets * sizeof(struct rx_packet);
-        p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+        p = (struct rx_packet *)osi_Alloc(getme);
        if (p == NULL) {
             apackets -= apackets / 4;
             osi_Assert(apackets > 0);
@@ -667,12 +691,18 @@ rxi_MorePacketsNoLock(int apackets)
     } while(p == NULL);
     memset((char *)p, 0, getme);
 
+#ifdef RX_ENABLE_TSFPQ
+    RX_TS_INFO_GET(rx_ts_info);
+    RX_TS_FPQ_GLOBAL_ALLOC(rx_ts_info,apackets);
+#endif /* RX_ENABLE_TSFPQ */ 
+
     for (e = p + apackets; p < e; p++) {
         RX_PACKET_IOV_INIT(p);
        p->flags |= RX_PKTFLAG_FREE;
        p->niovecs = 2;
 
        queue_Append(&rx_freePacketQueue, p);
+       rx_mallocedP = p;
     }
 
     rx_nFreePackets += apackets;
@@ -720,7 +750,7 @@ rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit)
             if ((num_keep_local > rx_TSFPQLocalMax) && !allow_overcommit)
                 xfer = rx_TSFPQLocalMax - rx_ts_info->_FPQ.len;
             if (rx_nFreePackets < xfer) {
-                rxi_MorePacketsNoLock(xfer - rx_nFreePackets);
+               rxi_MorePacketsNoLock(MAX(xfer - rx_nFreePackets, 4 * rx_initSendWindow));
             }
             RX_TS_FPQ_GTOL2(rx_ts_info, xfer);
         }
@@ -1102,7 +1132,7 @@ rxi_AllocPacketNoLock(int class)
            osi_Panic("rxi_AllocPacket error");
 #else /* KERNEL */
         if (queue_IsEmpty(&rx_freePacketQueue))
-           rxi_MorePacketsNoLock(rx_initSendWindow);
+           rxi_MorePacketsNoLock(4 * rx_initSendWindow);
 #endif /* KERNEL */
 
 
@@ -1158,7 +1188,7 @@ rxi_AllocPacketNoLock(int class)
        osi_Panic("rxi_AllocPacket error");
 #else /* KERNEL */
     if (queue_IsEmpty(&rx_freePacketQueue))
-       rxi_MorePacketsNoLock(rx_initSendWindow);
+       rxi_MorePacketsNoLock(4 * rx_initSendWindow);
 #endif /* KERNEL */
 
     rx_nFreePackets--;
@@ -1192,7 +1222,7 @@ rxi_AllocPacketTSFPQ(int class, int pull_global)
         MUTEX_ENTER(&rx_freePktQ_lock);
 
         if (queue_IsEmpty(&rx_freePacketQueue))
-            rxi_MorePacketsNoLock(rx_initSendWindow);
+           rxi_MorePacketsNoLock(4 * rx_initSendWindow);
 
        RX_TS_FPQ_GTOL(rx_ts_info);
 
@@ -1751,6 +1781,7 @@ rxi_ReceiveDebugPacket(register struct rx_packet *ap, osi_socket asocket,
 #endif
            MUTEX_ENTER(&rx_serverPool_lock);
            tstat.nFreePackets = htonl(rx_nFreePackets);
+           tstat.nPackets = htonl(rx_nPackets);
            tstat.callsExecuted = htonl(rxi_nCalls);
            tstat.packetReclaims = htonl(rx_packetReclaims);
            tstat.usedFDs = CountFDs(64);
index d04b3fe..718e7de 100644 (file)
@@ -9,10 +9,10 @@
 
 #ifndef _RX_PACKET_
 #define _RX_PACKET_
-#ifndef UKERNEL
 #if defined(AFS_NT40_ENV) 
 #include "rx_xmit_nt.h"
 #endif
+#ifndef UKERNEL
 #ifndef AFS_NT40_ENV
 #include <sys/uio.h>
 #endif /* !AFS_NT40_ENV */