rx: work harder to notice and handle MorePackets request
authorJeffrey Altman <jaltman@your-file-system.com>
Mon, 17 May 2010 20:01:03 +0000 (16:01 -0400)
committerDerrick Brashear <shadow@dementia.org>
Tue, 18 May 2010 13:07:57 +0000 (06:07 -0700)
in particular, we did badly at handling kernel requests for
more packets, but other cases did not properly keep packet
stats either. attempt to globally better handle demand for
more packets.

Change-Id: I88837fed880f582444221ec53d280ca4070b607d
Reviewed-on: http://gerrit.openafs.org/1978
Tested-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Jeffrey Altman <jaltman@openafs.org>
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>
Reviewed-by: Hartmut Reuter <reuter@rzg.mpg.de>
Reviewed-by: Derrick Brashear <shadow@dementia.org>

src/rx/rx.c
src/rx/rx_globals.h
src/rx/rx_kcommon.c
src/rx/rx_lwp.c
src/rx/rx_packet.c
src/rx/rx_packet.h
src/rx/rx_pthread.c

index 0f1aca2..3d9cb7e 100644 (file)
@@ -509,12 +509,17 @@ rx_InitHost(u_int host, u_int port)
     rx_nFreePackets = 0;
     queue_Init(&rx_freePacketQueue);
     rxi_NeedMorePackets = FALSE;
+    rx_nPackets = 0;   /* rx_nPackets is managed by rxi_MorePackets* */
+
+    /* enforce a minimum number of allocated packets */
+    if (rx_extraPackets < rxi_nSendFrags * rx_maxSendWindow)
+        rx_extraPackets = rxi_nSendFrags * rx_maxSendWindow;
+
+    /* allocate the initial free packet pool */
 #ifdef RX_ENABLE_TSFPQ
-    rx_nPackets = 0;   /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
     rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
 #else /* RX_ENABLE_TSFPQ */
-    rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;  /* fudge */
-    rxi_MorePackets(rx_nPackets);
+    rxi_MorePackets(rx_extraPackets + RX_MAX_QUOTA + 2);        /* fudge */
 #endif /* RX_ENABLE_TSFPQ */
     rx_CheckPackets();
 
@@ -665,8 +670,10 @@ QuotaOK(struct rx_service *aservice)
     /* otherwise, can use only if there are enough to allow everyone
      * to go to their min quota after this guy starts.
      */
+    MUTEX_ENTER(&rx_quota_mutex);
     if (rxi_availProcs > rxi_minDeficit)
        rc = 1;
+    MUTEX_EXIT(&rx_quota_mutex);
     return rc;
 }
 #endif /* RX_ENABLE_LOCKS */
@@ -1845,9 +1852,11 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
 
     if (cur_service != NULL) {
        cur_service->nRequestsRunning--;
+        MUTEX_ENTER(&rx_quota_mutex);
        if (cur_service->nRequestsRunning < cur_service->minProcs)
            rxi_minDeficit++;
        rxi_availProcs++;
+        MUTEX_EXIT(&rx_quota_mutex);
     }
     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
        struct rx_call *tcall, *ncall;
@@ -1910,9 +1919,11 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
        service->nRequestsRunning++;
        /* just started call in minProcs pool, need fewer to maintain
         * guarantee */
+        MUTEX_ENTER(&rx_quota_mutex);
        if (service->nRequestsRunning <= service->minProcs)
            rxi_minDeficit--;
        rxi_availProcs--;
+        MUTEX_EXIT(&rx_quota_mutex);
        rx_nWaiting--;
        /* MUTEX_EXIT(&call->lock); */
     } else {
@@ -4415,9 +4426,11 @@ rxi_AttachServerProc(struct rx_call *call,
        CV_SIGNAL(&sq->cv);
 #else
        service->nRequestsRunning++;
+        MUTEX_ENTER(&rx_quota_mutex);
        if (service->nRequestsRunning <= service->minProcs)
            rxi_minDeficit--;
        rxi_availProcs--;
+        MUTEX_EXIT(&rx_quota_mutex);
        osi_rxWakeup(sq);
 #endif
     }
index a265ba1..a8d3c24 100644 (file)
@@ -72,8 +72,8 @@ EXT int rx_intentionallyDroppedOnReadPer100  GLOBALSINIT(0);  /* Dropped on Read
 
 /* extra packets to add to the quota */
 EXT int rx_extraQuota GLOBALSINIT(0);
-/* extra packets to alloc (2 windows by deflt) */
-EXT int rx_extraPackets GLOBALSINIT(32);
+/* extra packets to alloc (2 * maxWindowSize by default) */
+EXT int rx_extraPackets GLOBALSINIT(256);
 
 EXT int rx_stackSize GLOBALSINIT(RX_DEFAULT_STACK_SIZE);
 
@@ -133,7 +133,7 @@ EXT int rx_initSendWindow GLOBALSINIT(16);
 EXT int rx_maxSendWindow GLOBALSINIT(128);
 EXT int rx_nackThreshold GLOBALSINIT(3);       /* Number NACKS to trigger congestion recovery */
 EXT int rx_nDgramThreshold GLOBALSINIT(4);     /* Number of packets before increasing
-                                        * packets per datagram */
+                                                 * packets per datagram */
 #define RX_MAX_FRAGS 4
 EXT int rxi_nSendFrags GLOBALSINIT(RX_MAX_FRAGS);      /* max fragments in a datagram */
 EXT int rxi_nRecvFrags GLOBALSINIT(RX_MAX_FRAGS);
@@ -158,7 +158,7 @@ EXT int rxi_HardAckRate GLOBALSINIT(RX_FAST_ACK_RATE + 1);
 
 #define        ACKHACK(p,r) { if (((p)->header.seq & (rxi_SoftAckRate))==0) (p)->header.flags |= RX_REQUEST_ACK; }
 
-EXT int rx_nPackets GLOBALSINIT(100);  /* obsolete; use rx_extraPackets now */
+EXT int rx_nPackets GLOBALSINIT(0);    /* preallocate packets with rx_extraPackets */
 
 /*
  * pthreads thread-specific rx info support
index 31fcace..d88ee40 100644 (file)
@@ -261,12 +261,13 @@ rx_ServerProc(void *unused)
 {
     int threadID;
 
-/* jaltman - rxi_dataQuota is protected by a mutex everywhere else */
     rxi_MorePackets(rx_maxReceiveWindow + 2);  /* alloc more packets */
+    MUTEX_ENTER(&rx_quota_mutex);
     rxi_dataQuota += rx_initSendWindow;        /* Reserve some pkts for hard times */
     /* threadID is used for making decisions in GetCall.  Get it by bumping
      * number of threads handling incoming calls */
     threadID = rxi_availProcs++;
+    MUTEX_EXIT(&rx_quota_mutex);
 
 #ifdef RX_ENABLE_LOCKS
     AFS_GUNLOCK();
@@ -1258,6 +1259,9 @@ rxk_Listener(void)
     AFS_GUNLOCK();
 #endif /* RX_ENABLE_LOCKS && !AFS_SUN5_ENV */
     while (afs_termState != AFSOP_STOP_RXK_LISTENER) {
+        /* See if a check for additional packets was issued */
+        rx_CheckPackets();
+
        if (rxp) {
            rxi_RestoreDataBufs(rxp);
        } else {
index 715eb34..060025b 100644 (file)
@@ -187,6 +187,9 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
        (*swapNameProgram) (pid, "listener", &name[0]);
 
     for (;;) {
+        /* See if a check for additional packets was issued */
+        rx_CheckPackets();
+
        /* Grab a new packet only if necessary (otherwise re-use the old one) */
        if (p) {
            rxi_RestoreDataBufs(p);
index 417de17..60697b8 100644 (file)
@@ -614,6 +614,7 @@ rxi_MorePackets(int apackets)
        rx_mallocedP = p;
     }
 
+    rx_nPackets += apackets;
     rx_nFreePackets += apackets;
     rxi_NeedMorePackets = FALSE;
     rxi_PacketsUnWait();
@@ -722,13 +723,12 @@ rxi_MorePacketsNoLock(int apackets)
     }
 
     rx_nFreePackets += apackets;
-#ifdef RX_ENABLE_TSFPQ
-    /* TSFPQ patch also needs to keep track of total packets */
     MUTEX_ENTER(&rx_packets_mutex);
     rx_nPackets += apackets;
+#ifdef RX_ENABLE_TSFPQ
     RX_TS_FPQ_COMPUTE_LIMITS;
-    MUTEX_EXIT(&rx_packets_mutex);
 #endif /* RX_ENABLE_TSFPQ */
+    MUTEX_EXIT(&rx_packets_mutex);
     rxi_NeedMorePackets = FALSE;
     rxi_PacketsUnWait();
 }
@@ -789,7 +789,7 @@ void
 rx_CheckPackets(void)
 {
     if (rxi_NeedMorePackets) {
-       rxi_MorePackets(rx_initSendWindow);
+       rxi_MorePackets(rx_maxSendWindow);
     }
 }
 
@@ -1153,7 +1153,7 @@ rxi_AllocPacketNoLock(int class)
            osi_Panic("rxi_AllocPacket error");
 #else /* KERNEL */
         if (queue_IsEmpty(&rx_freePacketQueue))
-           rxi_MorePacketsNoLock(4 * rx_initSendWindow);
+           rxi_MorePacketsNoLock(rx_maxSendWindow);
 #endif /* KERNEL */
 
 
@@ -1212,7 +1212,7 @@ rxi_AllocPacketNoLock(int class)
        osi_Panic("rxi_AllocPacket error");
 #else /* KERNEL */
     if (queue_IsEmpty(&rx_freePacketQueue))
-       rxi_MorePacketsNoLock(4 * rx_initSendWindow);
+       rxi_MorePacketsNoLock(rx_maxSendWindow);
 #endif /* KERNEL */
 
     rx_nFreePackets--;
@@ -1247,7 +1247,7 @@ rxi_AllocPacketTSFPQ(int class, int pull_global)
         MUTEX_ENTER(&rx_freePktQ_lock);
 
         if (queue_IsEmpty(&rx_freePacketQueue))
-           rxi_MorePacketsNoLock(4 * rx_initSendWindow);
+           rxi_MorePacketsNoLock(rx_maxSendWindow);
 
        RX_TS_FPQ_GTOL(rx_ts_info);
 
index 7028825..5f151d8 100644 (file)
                                         * grows past 13, rxdebug packets
                                         * will need to be modified */
 
-/* Packet classes, for rx_AllocPacket */
+/* Packet classes, for rx_AllocPacket and rx_packetQuota */
 #define        RX_PACKET_CLASS_RECEIVE     0
 #define        RX_PACKET_CLASS_SEND        1
 #define        RX_PACKET_CLASS_SPECIAL     2
index cab61ec..4efa7f4 100644 (file)
@@ -217,6 +217,9 @@ rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
     MUTEX_EXIT(&listener_mutex);
 
     for (;;) {
+        /* See if a check for additional packets was issued */
+        rx_CheckPackets();
+
        /*
         * Grab a new packet only if necessary (otherwise re-use the old one)
         */