ubik-call-sucks-20060704
[openafs.git] / src / rx / rx_globals.h
index fffb414..0f329e3 100644 (file)
 
 #ifndef INIT
 #define INIT(x)
-#if defined(AFS_NT40_ENV) && defined(AFS_PTHREAD_ENV)
+#if defined(AFS_NT40_ENV)
+#if defined(AFS_PTHREAD_ENV)
 #define EXT __declspec(dllimport) extern
 #else
 #define        EXT extern
 #endif
+#define EXT2 __declspec(dllimport) extern
+#else
+#define EXT2 extern
+#define EXT extern
+#endif
 #endif /* !INIT */
 
 /* Basic socket for client requests; other sockets (for receiving server requests) are in the service structures */
@@ -62,6 +68,7 @@ EXT struct clock rx_softAckDelay;
 /* Variable to allow introduction of network unreliability */
 #ifdef RXDEBUG
 EXT int rx_intentionallyDroppedPacketsPer100 INIT(0);  /* Dropped on Send */
+EXT int rx_intentionallyDroppedOnReadPer100  INIT(0);  /* Dropped on Read */
 #endif
 
 /* extra packets to add to the quota */
@@ -159,7 +166,20 @@ typedef struct rx_ts_info_t {
         struct rx_queue queue;
         int len;                /* local queue length */
         int delta;              /* number of new packets alloc'd locally since last sync w/ global queue */
+        
+        /* FPQ stats */
+        int checkin_ops;
+        int checkin_xfer;
+        int checkout_ops;
+        int checkout_xfer;
+        int gtol_ops;
+        int gtol_xfer;
+        int ltog_ops;
+        int ltog_xfer;
+        int alloc_ops;
+        int alloc_xfer;
     } _FPQ;
+    struct rx_packet * local_special_packet;
 } rx_ts_info_t;
 EXT struct rx_ts_info_t * rx_ts_info_init();   /* init function for thread-specific data struct */
 #define RX_TS_INFO_GET(ts_info_p) \
@@ -206,16 +226,22 @@ EXT struct rx_queue rx_freePacketQueue;
        (p)->niovecs = 2; \
        (p)->length = RX_FIRSTBUFFERSIZE; \
     } while(0)
+
 #ifdef RX_ENABLE_LOCKS
 EXT afs_kmutex_t rx_freePktQ_lock;
-#ifdef AFS_PTHREAD_ENV
+#endif /* RX_ENABLE_LOCKS */
+
+#if defined(AFS_PTHREAD_ENV)
 #define RX_ENABLE_TSFPQ
 EXT int rx_TSFPQGlobSize INIT(3); /* number of packets to transfer between global and local queues in one op */
 EXT int rx_TSFPQLocalMax INIT(15); /* max number of packets on local FPQ before returning a glob to the global pool */
-EXT int rx_TSFPQMaxProcs INIT(1); /* max number of threads expected */
+EXT int rx_TSFPQMaxProcs INIT(0); /* max number of threads expected */
 EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local); /* more flexible packet alloc function */
+EXT void rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit); /* adjust thread-local queue length, for places where we know how many packets we will need a priori */
+EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to global queue */
 #define RX_TS_FPQ_FLUSH_GLOBAL 1
 #define RX_TS_FPQ_PULL_GLOBAL 1
+#define RX_TS_FPQ_ALLOW_OVERCOMMIT 1
 /* compute the localmax and globsize values from rx_TSFPQMaxProcs and rx_nPackets.
    arbitarily set local max so that all threads consume 90% of packets, if all local queues are full.
    arbitarily set transfer glob size to 20% of max local packet queue length.
@@ -238,17 +264,21 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         register int i; \
         register struct rx_packet * p; \
         register int tsize = (rx_ts_info_p)->_FPQ.len - rx_TSFPQLocalMax + rx_TSFPQGlobSize; \
-        for (i=0; i < tsize; i++) { \
-            p = queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
-            queue_Remove(p); \
-            queue_Prepend(&rx_freePacketQueue,p); \
-        } \
+        for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
+             i < tsize; i++,p=queue_Prev(p, rx_packet)); \
+        queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \
         (rx_ts_info_p)->_FPQ.len -= tsize; \
         rx_nFreePackets += tsize; \
+        (rx_ts_info_p)->_FPQ.ltog_ops++; \
+        (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \
         if ((rx_ts_info_p)->_FPQ.delta) { \
+            (rx_ts_info_p)->_FPQ.alloc_ops++; \
+            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
+            MUTEX_ENTER(&rx_stats_mutex); \
             rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
-            (rx_ts_info_p)->_FPQ.delta = 0; \
             RX_TS_FPQ_COMPUTE_LIMITS; \
+            MUTEX_EXIT(&rx_stats_mutex); \
+           (rx_ts_info_p)->_FPQ.delta = 0; \
         } \
     } while(0)
 /* same as above, except user has direct control over number to transfer */
@@ -256,32 +286,51 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
     do { \
         register int i; \
         register struct rx_packet * p; \
-        for (i=0; i < (num_transfer); i++) { \
-            p = queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
-            queue_Remove(p); \
-            queue_Prepend(&rx_freePacketQueue,p); \
-        } \
+        for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
+            i < (num_transfer); i++,p=queue_Prev(p, rx_packet)); \
+        queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \
         (rx_ts_info_p)->_FPQ.len -= (num_transfer); \
         rx_nFreePackets += (num_transfer); \
+        (rx_ts_info_p)->_FPQ.ltog_ops++; \
+        (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \
         if ((rx_ts_info_p)->_FPQ.delta) { \
+            (rx_ts_info_p)->_FPQ.alloc_ops++; \
+            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
+            MUTEX_ENTER(&rx_stats_mutex); \
             rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
-            (rx_ts_info_p)->_FPQ.delta = 0; \
             RX_TS_FPQ_COMPUTE_LIMITS; \
+            MUTEX_EXIT(&rx_stats_mutex); \
+            (rx_ts_info_p)->_FPQ.delta = 0; \
         } \
     } while(0)
 /* move packets from global to local (thread-specific) free packet queue.
    rx_freePktQ_lock must be held. */
 #define RX_TS_FPQ_GTOL(rx_ts_info_p) \
     do { \
+        register int i, tsize; \
+        register struct rx_packet * p; \
+        tsize = (rx_TSFPQGlobSize <= rx_nFreePackets) ? \
+                 rx_TSFPQGlobSize : rx_nFreePackets; \
+        for (i=0,p=queue_First(&rx_freePacketQueue, rx_packet); \
+             i < tsize; i++,p=queue_Next(p, rx_packet)); \
+        queue_SplitBeforeAppend(&rx_freePacketQueue,&((rx_ts_info_p)->_FPQ),p); \
+        (rx_ts_info_p)->_FPQ.len += i; \
+        rx_nFreePackets -= i; \
+        (rx_ts_info_p)->_FPQ.gtol_ops++; \
+        (rx_ts_info_p)->_FPQ.gtol_xfer += i; \
+    } while(0)
+/* same as above, except user has direct control over number to transfer */
+#define RX_TS_FPQ_GTOL2(rx_ts_info_p,num_transfer) \
+    do { \
         register int i; \
         register struct rx_packet * p; \
-        for (i=0; (i < rx_TSFPQGlobSize) && queue_IsNotEmpty(&rx_freePacketQueue); i++) { \
-            p = queue_First(&rx_freePacketQueue, rx_packet); \
-            queue_Remove(p); \
-            queue_Append(&((rx_ts_info_p)->_FPQ),p); \
-        } \
+        for (i=0,p=queue_First(&rx_freePacketQueue, rx_packet); \
+             i < (num_transfer); i++,p=queue_Next(p, rx_packet)); \
+        queue_SplitBeforeAppend(&rx_freePacketQueue,&((rx_ts_info_p)->_FPQ),p); \
         (rx_ts_info_p)->_FPQ.len += i; \
         rx_nFreePackets -= i; \
+        (rx_ts_info_p)->_FPQ.gtol_ops++; \
+        (rx_ts_info_p)->_FPQ.gtol_xfer += i; \
     } while(0)
 /* checkout a packet from the thread-specific free packet queue */
 #define RX_TS_FPQ_CHECKOUT(rx_ts_info_p,p) \
@@ -290,6 +339,23 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         queue_Remove(p); \
         RX_FPQ_MARK_USED(p); \
         (rx_ts_info_p)->_FPQ.len--; \
+        (rx_ts_info_p)->_FPQ.checkout_ops++; \
+        (rx_ts_info_p)->_FPQ.checkout_xfer++; \
+    } while(0)
+/* checkout multiple packets from the thread-specific free packet queue */
+#define RX_TS_FPQ_CHECKOUT2(rx_ts_info_p,num_transfer,q) \
+    do { \
+        register int i; \
+        register struct rx_packet *p; \
+        for (i=0, p=queue_First(&((rx_ts_info_p)->_FPQ), rx_packet); \
+             i < (num_transfer); \
+             i++, p=queue_Next(p, rx_packet)) { \
+            RX_FPQ_MARK_USED(p); \
+        } \
+        queue_SplitBeforeAppend(&((rx_ts_info_p)->_FPQ),(q),p); \
+        (rx_ts_info_p)->_FPQ.len -= (num_transfer); \
+        (rx_ts_info_p)->_FPQ.checkout_ops++; \
+        (rx_ts_info_p)->_FPQ.checkout_xfer += (num_transfer); \
     } while(0)
 /* check a packet into the thread-specific free packet queue */
 #define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \
@@ -297,9 +363,25 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local
         queue_Prepend(&((rx_ts_info_p)->_FPQ), (p)); \
         RX_FPQ_MARK_FREE(p); \
         (rx_ts_info_p)->_FPQ.len++; \
+        (rx_ts_info_p)->_FPQ.checkin_ops++; \
+        (rx_ts_info_p)->_FPQ.checkin_xfer++; \
+    } while(0)
+/* check multiple packets into the thread-specific free packet queue */
+/* num_transfer must equal length of (q); it is not a means of checking 
+ * in part of (q).  passing num_transfer just saves us instructions 
+ * since caller already knows length of (q) for other reasons */
+#define RX_TS_FPQ_CHECKIN2(rx_ts_info_p,num_transfer,q) \
+    do { \
+        register struct rx_packet *p, *np; \
+        for (queue_Scan((q), p, np, rx_packet)) { \
+            RX_FPQ_MARK_FREE(p); \
+        } \
+        queue_SplicePrepend(&((rx_ts_info_p)->_FPQ),(q)); \
+        (rx_ts_info_p)->_FPQ.len += (num_transfer); \
+        (rx_ts_info_p)->_FPQ.checkin_ops++; \
+        (rx_ts_info_p)->_FPQ.checkin_xfer += (num_transfer); \
     } while(0)
 #endif /* AFS_PTHREAD_ENV */
-#endif /* RX_ENABLE_LOCKS */
 
 /* Number of free packets */
 EXT int rx_nFreePackets INIT(0);
@@ -350,9 +432,9 @@ EXT u_short rx_port;
 #if !defined(KERNEL) && !defined(AFS_PTHREAD_ENV)
 /* 32-bit select Mask for rx_Listener. */
 EXT fd_set rx_selectMask;
-EXT int rx_maxSocketNumber;    /* Maximum socket number in the select mask. */
+EXT osi_socket rx_maxSocketNumber;     /* Maximum socket number in the select mask. */
 /* Minumum socket number in the select mask. */
-EXT int rx_minSocketNumber INIT(0x7fffffff);
+EXT osi_socket rx_minSocketNumber INIT(0x7fffffff);
 #endif
 
 /* This is actually the minimum number of packets that must remain free,
@@ -411,7 +493,9 @@ EXT afs_kmutex_t rx_connHashTable_lock;
 
 #define CONN_HASH(host, port, cid, epoch, type) ((((cid)>>RX_CIDSHIFT)%rx_hashTableSize))
 
+#if 0
 #define PEER_HASH(host, port)  ((host ^ port) % rx_hashTableSize)
+#endif
 
 /* Forward definitions of internal procedures */
 #define        rxi_ChallengeOff(conn)  rxevent_Cancel((conn)->challengeEvent, (struct rx_call*)0, 0);
@@ -432,7 +516,16 @@ EXT FILE *rx_debugFile;            /* Set by the user to a stdio file for debugging output
 EXT FILE *rxevent_debugFile;   /* Set to an stdio descriptor for event logging to that file */
 
 #define rx_Log rx_debugFile
+#ifdef AFS_NT40_ENV
+EXT int rxdebug_active;
+#if !defined(_WIN64)
+#define dpf(args) if (rxdebug_active) rxi_DebugPrint args;
+#else
+#define dpf(args)
+#endif
+#else
 #define dpf(args) if (rx_debugFile) rxi_DebugPrint args; else
+#endif
 #define rx_Log_event rxevent_debugFile
 
 EXT char *rx_packetTypes[RX_N_PACKET_TYPES] INIT(RX_PACKET_TYPES);     /* Strings defined in rx.h */
@@ -484,7 +577,7 @@ EXT int rxi_pthread_hinum INIT(0);
 EXT afs_kmutex_t rx_stats_mutex;       /* used to activate stats gathering */
 #endif
 
-EXT int rx_enable_stats INIT(0);
+EXT2 int rx_enable_stats INIT(0);
 
 /*
  * Set this flag to enable the listener thread to trade places with an idle