From 230dcebcd61064cc9aab6d20d34ff866a5c575ea Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Thu, 25 Sep 2008 20:34:41 +0000 Subject: [PATCH] rx-change-packet-allocation-calculation-20080925 LICENSE IPL10 don't alloc ourselves to death; try harder to manage growth --- src/rx/rx_globals.h | 33 ++++++++++++++++--------- src/rx/rx_packet.c | 71 ++++++++++++++++++++++++++++++++++++++--------------- src/rx/rx_packet.h | 2 +- 3 files changed, 74 insertions(+), 32 deletions(-) diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 101c172..f931db3 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -177,8 +177,10 @@ typedef struct rx_ts_info_t { int gtol_xfer; int ltog_ops; int ltog_xfer; - int alloc_ops; - int alloc_xfer; + int lalloc_ops; + int lalloc_xfer; + int galloc_ops; + int galloc_xfer; } _FPQ; struct rx_packet * local_special_packet; } rx_ts_info_t; @@ -265,14 +267,28 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to rx_TSFPQLocalMax = newmax; \ rx_TSFPQGlobSize = newglob; \ } while(0) +/* record the number of packets allocated by this thread + * and stored in the thread local queue */ +#define RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info_p,num_alloc) \ + do { \ + (rx_ts_info_p)->_FPQ.lalloc_ops++; \ + (rx_ts_info_p)->_FPQ.lalloc_xfer += num_alloc; \ + } while (0) +/* record the number of packets allocated by this thread + * and stored in the global queue */ +#define RX_TS_FPQ_GLOBAL_ALLOC(rx_ts_info_p,num_alloc) \ + do { \ + (rx_ts_info_p)->_FPQ.galloc_ops++; \ + (rx_ts_info_p)->_FPQ.galloc_xfer += num_alloc; \ + } while (0) /* move packets from local (thread-specific) to global free packet queue. - rx_freePktQ_lock must be held. default is to move the difference between the current lenght, and the - allowed max plus one extra glob. */ + rx_freePktQ_lock must be held. default is to reduce the queue size to 40% ofmax */ #define RX_TS_FPQ_LTOG(rx_ts_info_p) \ do { \ register int i; \ register struct rx_packet * p; \ - register int tsize = (rx_ts_info_p)->_FPQ.len - rx_TSFPQLocalMax + rx_TSFPQGlobSize; \ + register int tsize = (rx_ts_info_p)->_FPQ.len - rx_TSFPQLocalMax + 3 * rx_TSFPQGlobSize; \ + if (tsize <= 0) break; \ for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \ i < tsize; i++,p=queue_Prev(p, rx_packet)); \ queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \ @@ -281,10 +297,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to (rx_ts_info_p)->_FPQ.ltog_ops++; \ (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \ if ((rx_ts_info_p)->_FPQ.delta) { \ - (rx_ts_info_p)->_FPQ.alloc_ops++; \ - (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \ MUTEX_ENTER(&rx_stats_mutex); \ - rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ RX_TS_FPQ_COMPUTE_LIMITS; \ MUTEX_EXIT(&rx_stats_mutex); \ (rx_ts_info_p)->_FPQ.delta = 0; \ @@ -295,6 +308,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to do { \ register int i; \ register struct rx_packet * p; \ + if (num_transfer <= 0) break; \ for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \ i < (num_transfer); i++,p=queue_Prev(p, rx_packet)); \ queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \ @@ -303,10 +317,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to (rx_ts_info_p)->_FPQ.ltog_ops++; \ (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \ if ((rx_ts_info_p)->_FPQ.delta) { \ - (rx_ts_info_p)->_FPQ.alloc_ops++; \ - (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \ MUTEX_ENTER(&rx_stats_mutex); \ - rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ RX_TS_FPQ_COMPUTE_LIMITS; \ MUTEX_EXIT(&rx_stats_mutex); \ (rx_ts_info_p)->_FPQ.delta = 0; \ diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index 4175a1c..686a52e 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -267,7 +267,7 @@ static int AllocPacketBufs(int class, int num_pkts, struct rx_queue * q) { register struct rx_ts_info_t * rx_ts_info; - int transfer, alloc; + int transfer; SPLVAR; RX_TS_INFO_GET(rx_ts_info); @@ -276,16 +276,10 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q) if (transfer > 0) { NETPRI; MUTEX_ENTER(&rx_freePktQ_lock); - - if ((transfer + rx_TSFPQGlobSize) <= rx_nFreePackets) { - transfer += rx_TSFPQGlobSize; - } else if (transfer <= rx_nFreePackets) { - transfer = rx_nFreePackets; - } else { + transfer = MAX(transfer, rx_TSFPQGlobSize); + if (transfer > rx_nFreePackets) { /* alloc enough for us, plus a few globs for other threads */ - alloc = transfer + (3 * rx_TSFPQGlobSize) - rx_nFreePackets; - rxi_MorePacketsNoLock(MAX(alloc, rx_initSendWindow)); - transfer = rx_TSFPQGlobSize; + rxi_MorePacketsNoLock(transfer + 4 * rx_initSendWindow); } RX_TS_FPQ_GTOL2(rx_ts_info, transfer); @@ -347,7 +341,7 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q) } #else /* KERNEL */ if (rx_nFreePackets < num_pkts) { - rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), rx_initSendWindow)); + rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), 4 * rx_initSendWindow)); } #endif /* KERNEL */ @@ -546,18 +540,31 @@ rxi_MorePackets(int apackets) SPLVAR; getme = apackets * sizeof(struct rx_packet); - p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme); + p = (struct rx_packet *)osi_Alloc(getme); osi_Assert(p); PIN(p, getme); /* XXXXX */ memset((char *)p, 0, getme); RX_TS_INFO_GET(rx_ts_info); + RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets); + /* TSFPQ patch also needs to keep track of total packets */ + MUTEX_ENTER(&rx_stats_mutex); + rx_nPackets += apackets; + RX_TS_FPQ_COMPUTE_LIMITS; + MUTEX_EXIT(&rx_stats_mutex); + for (e = p + apackets; p < e; p++) { RX_PACKET_IOV_INIT(p); p->niovecs = 2; RX_TS_FPQ_CHECKIN(rx_ts_info,p); + + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + rx_mallocedP = p; + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; } rx_ts_info->_FPQ.delta += apackets; @@ -582,7 +589,7 @@ rxi_MorePackets(int apackets) SPLVAR; getme = apackets * sizeof(struct rx_packet); - p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme); + p = (struct rx_packet *)osi_Alloc(getme); osi_Assert(p); PIN(p, getme); /* XXXXX */ @@ -596,7 +603,9 @@ rxi_MorePackets(int apackets) p->niovecs = 2; queue_Append(&rx_freePacketQueue, p); + rx_mallocedP = p; } + rx_nFreePackets += apackets; rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); @@ -616,17 +625,29 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local) SPLVAR; getme = apackets * sizeof(struct rx_packet); - p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme); + p = (struct rx_packet *)osi_Alloc(getme); PIN(p, getme); /* XXXXX */ memset((char *)p, 0, getme); RX_TS_INFO_GET(rx_ts_info); + RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets); + /* TSFPQ patch also needs to keep track of total packets */ + MUTEX_ENTER(&rx_stats_mutex); + rx_nPackets += apackets; + RX_TS_FPQ_COMPUTE_LIMITS; + MUTEX_EXIT(&rx_stats_mutex); + for (e = p + apackets; p < e; p++) { RX_PACKET_IOV_INIT(p); p->niovecs = 2; - RX_TS_FPQ_CHECKIN(rx_ts_info,p); + + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + rx_mallocedP = p; + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; } rx_ts_info->_FPQ.delta += apackets; @@ -650,6 +671,9 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local) void rxi_MorePacketsNoLock(int apackets) { +#ifdef RX_ENABLE_TSFPQ + register struct rx_ts_info_t * rx_ts_info; +#endif /* RX_ENABLE_TSFPQ */ struct rx_packet *p, *e; int getme; @@ -659,7 +683,7 @@ rxi_MorePacketsNoLock(int apackets) * ((rx_maxJumboRecvSize - RX_FIRSTBUFFERSIZE) / RX_CBUFFERSIZE); do { getme = apackets * sizeof(struct rx_packet); - p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme); + p = (struct rx_packet *)osi_Alloc(getme); if (p == NULL) { apackets -= apackets / 4; osi_Assert(apackets > 0); @@ -667,12 +691,18 @@ rxi_MorePacketsNoLock(int apackets) } while(p == NULL); memset((char *)p, 0, getme); +#ifdef RX_ENABLE_TSFPQ + RX_TS_INFO_GET(rx_ts_info); + RX_TS_FPQ_GLOBAL_ALLOC(rx_ts_info,apackets); +#endif /* RX_ENABLE_TSFPQ */ + for (e = p + apackets; p < e; p++) { RX_PACKET_IOV_INIT(p); p->flags |= RX_PKTFLAG_FREE; p->niovecs = 2; queue_Append(&rx_freePacketQueue, p); + rx_mallocedP = p; } rx_nFreePackets += apackets; @@ -720,7 +750,7 @@ rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit) if ((num_keep_local > rx_TSFPQLocalMax) && !allow_overcommit) xfer = rx_TSFPQLocalMax - rx_ts_info->_FPQ.len; if (rx_nFreePackets < xfer) { - rxi_MorePacketsNoLock(xfer - rx_nFreePackets); + rxi_MorePacketsNoLock(MAX(xfer - rx_nFreePackets, 4 * rx_initSendWindow)); } RX_TS_FPQ_GTOL2(rx_ts_info, xfer); } @@ -1102,7 +1132,7 @@ rxi_AllocPacketNoLock(int class) osi_Panic("rxi_AllocPacket error"); #else /* KERNEL */ if (queue_IsEmpty(&rx_freePacketQueue)) - rxi_MorePacketsNoLock(rx_initSendWindow); + rxi_MorePacketsNoLock(4 * rx_initSendWindow); #endif /* KERNEL */ @@ -1158,7 +1188,7 @@ rxi_AllocPacketNoLock(int class) osi_Panic("rxi_AllocPacket error"); #else /* KERNEL */ if (queue_IsEmpty(&rx_freePacketQueue)) - rxi_MorePacketsNoLock(rx_initSendWindow); + rxi_MorePacketsNoLock(4 * rx_initSendWindow); #endif /* KERNEL */ rx_nFreePackets--; @@ -1192,7 +1222,7 @@ rxi_AllocPacketTSFPQ(int class, int pull_global) MUTEX_ENTER(&rx_freePktQ_lock); if (queue_IsEmpty(&rx_freePacketQueue)) - rxi_MorePacketsNoLock(rx_initSendWindow); + rxi_MorePacketsNoLock(4 * rx_initSendWindow); RX_TS_FPQ_GTOL(rx_ts_info); @@ -1751,6 +1781,7 @@ rxi_ReceiveDebugPacket(register struct rx_packet *ap, osi_socket asocket, #endif MUTEX_ENTER(&rx_serverPool_lock); tstat.nFreePackets = htonl(rx_nFreePackets); + tstat.nPackets = htonl(rx_nPackets); tstat.callsExecuted = htonl(rxi_nCalls); tstat.packetReclaims = htonl(rx_packetReclaims); tstat.usedFDs = CountFDs(64); diff --git a/src/rx/rx_packet.h b/src/rx/rx_packet.h index d04b3fe..718e7de 100644 --- a/src/rx/rx_packet.h +++ b/src/rx/rx_packet.h @@ -9,10 +9,10 @@ #ifndef _RX_PACKET_ #define _RX_PACKET_ -#ifndef UKERNEL #if defined(AFS_NT40_ENV) #include "rx_xmit_nt.h" #endif +#ifndef UKERNEL #ifndef AFS_NT40_ENV #include #endif /* !AFS_NT40_ENV */ -- 1.9.4