ubik-call-sucks-20060704
[openafs.git] / src / rx / rx_globals.h
index b47f4a8..0f329e3 100644 (file)
@@ -9,6 +9,10 @@
 
 /* RX:  Globals for internal use, basically */
 
+#ifndef AFS_RX_GLOBALS_H
+#define AFS_RX_GLOBALS_H
+
+
 #ifdef KERNEL
 #include "rx/rx.h"
 #else /* KERNEL */
 
 #ifndef INIT
 #define INIT(x)
-#if defined(AFS_NT40_ENV) && defined(AFS_PTHREAD_ENV)
+#if defined(AFS_NT40_ENV)
+#if defined(AFS_PTHREAD_ENV)
 #define EXT __declspec(dllimport) extern
 #else
 #define        EXT extern
 #endif
+#define EXT2 __declspec(dllimport) extern
+#else
+#define EXT2 extern
+#define EXT extern
 #endif
+#endif /* !INIT */
 
 /* Basic socket for client requests; other sockets (for receiving server requests) are in the service structures */
 EXT osi_socket rx_socket;
@@ -58,6 +68,7 @@ EXT struct clock rx_softAckDelay;
 /* Variable to allow introduction of network unreliability */
 #ifdef RXDEBUG
 EXT int rx_intentionallyDroppedPacketsPer100 INIT(0);  /* Dropped on Send */
+EXT int rx_intentionallyDroppedOnReadPer100  INIT(0);  /* Dropped on Read */
 #endif
 
 /* extra packets to add to the quota */
@@ -141,11 +152,236 @@ EXT int rxi_HardAckRate INIT(RX_FAST_ACK_RATE + 1);
 
 EXT int rx_nPackets INIT(100); /* obsolete; use rx_extraPackets now */
 
+/*
+ * pthreads thread-specific rx info support
+ * the rx_ts_info_t struct is meant to support all kinds of
+ * thread-specific rx data:
+ *
+ *  _FPQ member contains a thread-specific free packet queue
+ */
+#ifdef AFS_PTHREAD_ENV
+EXT pthread_key_t rx_ts_info_key;
+typedef struct rx_ts_info_t {
+    struct {
+        struct rx_queue queue;
+        int len;                /* local queue length */
+        int delta;              /* number of new packets alloc'd locally since last sync w/ global queue */
+        
+        /* FPQ stats */
+        int checkin_ops;
+        int checkin_xfer;
+        int checkout_ops;
+        int checkout_xfer;
+        int gtol_ops;
+        int gtol_xfer;
+        int ltog_ops;
+        int ltog_xfer;
+        int alloc_ops;
+        int alloc_xfer;
+    } _FPQ;
+    struct rx_packet * local_special_packet;
+} rx_ts_info_t;
+EXT struct rx_ts_info_t * rx_ts_info_init();   /* init function for thread-specific data struct */
+#define RX_TS_INFO_GET(ts_info_p) \
+    do { \
+        ts_info_p = (struct rx_ts_info_t*)pthread_getspecific(rx_ts_info_key); \
+        if (ts_info_p == NULL) { \
+            assert((ts_info_p = rx_ts_info_init()) != NULL); \
+        } \
+    } while(0)
+#endif /* AFS_PTHREAD_ENV */
+
+
 /* List of free packets */
+/* in pthreads rx, free packet queue is now a two-tiered queueing system
+ * in which the first tier is thread-specific, and the second tier is
+ * a global free packet queue */
 EXT struct rx_queue rx_freePacketQueue;
+#define RX_FPQ_MARK_FREE(p) \
+    do { \
+        if ((p)->flags & RX_PKTFLAG_FREE) \
+            osi_Panic("rx packet already free\n"); \
+        (p)->flags |= RX_PKTFLAG_FREE; \
+    } while(0)
+#define RX_FPQ_MARK_USED(p) \
+    do { \
+        if (!((p)->flags & RX_PKTFLAG_FREE)) \
+            osi_Panic("rx packet not free\n"); \
+        (p)->flags = 0;                /* clear RX_PKTFLAG_FREE, initialize the rest */ \
+        (p)->header.flags = 0; \
+    } while(0)
+#define RX_PACKET_IOV_INIT(p) \
+    do { \
+       (p)->wirevec[0].iov_base = (char *)((p)->wirehead); \
+       (p)->wirevec[0].iov_len = RX_HEADER_SIZE; \
+       (p)->wirevec[1].iov_base = (char *)((p)->localdata); \
+       (p)->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; \
+    } while(0)
+#define RX_PACKET_IOV_FULLINIT(p) \
+    do { \
+       (p)->wirevec[0].iov_base = (char *)((p)->wirehead); \
+       (p)->wirevec[0].iov_len = RX_HEADER_SIZE; \
+       (p)->wirevec[1].iov_base = (char *)((p)->localdata); \
+       (p)->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; \
+       (p)->niovecs = 2; \
+       (p)->length = RX_FIRSTBUFFERSIZE; \
+    } while(0)
+
 #ifdef RX_ENABLE_LOCKS
 EXT afs_kmutex_t rx_freePktQ_lock;
-#endif
+#endif /* RX_ENABLE_LOCKS */
+
+#if defined(AFS_PTHREAD_ENV)
+#define RX_ENABLE_TSFPQ
+EXT int rx_TSFPQGlobSize INIT(3); /* number of packets to transfer between global and local queues in one op */
+EXT int rx_TSFPQLocalMax INIT(15); /* max number of packets on local FPQ before returning a glob to the global pool */
+EXT int rx_TSFPQMaxProcs INIT(0); /* max number of threads expected */
+EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local); /* more flexible packet alloc function */
+EXT void rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit); /* adjust thread-local queue length, for places where we know how many packets we will need a priori */
+EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to global queue */
+#define RX_TS_FPQ_FLUSH_GLOBAL 1
+#define RX_TS_FPQ_PULL_GLOBAL 1
+#define RX_TS_FPQ_ALLOW_OVERCOMMIT 1
+/* compute the localmax and globsize values from rx_TSFPQMaxProcs and rx_nPackets.
+   arbitarily set local max so that all threads consume 90% of packets, if all local queues are full.
+   arbitarily set transfer glob size to 20% of max local packet queue length.
+   also set minimum values of 15 and 3. */
+#define RX_TS_FPQ_COMPUTE_LIMITS \
+    do { \
+        register int newmax, newglob; \
+        newmax = (rx_nPackets * 9) / (10 * rx_TSFPQMaxProcs); \
+        newmax = (newmax >= 15) ? newmax : 15; \
+        newglob = newmax / 5; \
+        newglob = (newglob >= 3) ? newglob : 3; \
+        rx_TSFPQLocalMax = newmax; \
+        rx_TSFPQGlobSize = newglob; \
+    } while(0)
+/* move packets from local (thread-specific) to global free packet queue.
+   rx_freePktQ_lock must be held. default is to move the difference between the current lenght, and the 
+   allowed max plus one extra glob. */
+#define RX_TS_FPQ_LTOG(rx_ts_info_p) \
+    do { \
+        register int i; \
+        register struct rx_packet * p; \
+        register int tsize = (rx_ts_info_p)->_FPQ.len - rx_TSFPQLocalMax + rx_TSFPQGlobSize; \
+        for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
+             i < tsize; i++,p=queue_Prev(p, rx_packet)); \
+        queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \
+        (rx_ts_info_p)->_FPQ.len -= tsize; \
+        rx_nFreePackets += tsize; \
+        (rx_ts_info_p)->_FPQ.ltog_ops++; \
+        (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \
+        if ((rx_ts_info_p)->_FPQ.delta) { \
+            (rx_ts_info_p)->_FPQ.alloc_ops++; \
+            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
+            MUTEX_ENTER(&rx_stats_mutex); \
+            rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
+            RX_TS_FPQ_COMPUTE_LIMITS; \
+            MUTEX_EXIT(&rx_stats_mutex); \
+           (rx_ts_info_p)->_FPQ.delta = 0; \
+        } \
+    } while(0)
+/* same as above, except user has direct control over number to transfer */
+#define RX_TS_FPQ_LTOG2(rx_ts_info_p,num_transfer) \
+    do { \
+        register int i; \
+        register struct rx_packet * p; \
+        for (i=0,p=queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \
+            i < (num_transfer); i++,p=queue_Prev(p, rx_packet)); \
+        queue_SplitAfterPrepend(&((rx_ts_info_p)->_FPQ),&rx_freePacketQueue,p); \
+        (rx_ts_info_p)->_FPQ.len -= (num_transfer); \
+        rx_nFreePackets += (num_transfer); \
+        (rx_ts_info_p)->_FPQ.ltog_ops++; \
+        (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \
+        if ((rx_ts_info_p)->_FPQ.delta) { \
+            (rx_ts_info_p)->_FPQ.alloc_ops++; \
+            (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \
+            MUTEX_ENTER(&rx_stats_mutex); \
+            rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \
+            RX_TS_FPQ_COMPUTE_LIMITS; \
+            MUTEX_EXIT(&rx_stats_mutex); \
+            (rx_ts_info_p)->_FPQ.delta = 0; \
+        } \
+    } while(0)
+/* move packets from global to local (thread-specific) free packet queue.
+   rx_freePktQ_lock must be held. */
+#define RX_TS_FPQ_GTOL(rx_ts_info_p) \
+    do { \
+        register int i, tsize; \
+        register struct rx_packet * p; \
+        tsize = (rx_TSFPQGlobSize <= rx_nFreePackets) ? \
+                 rx_TSFPQGlobSize : rx_nFreePackets; \
+        for (i=0,p=queue_First(&rx_freePacketQueue, rx_packet); \
+             i < tsize; i++,p=queue_Next(p, rx_packet)); \
+        queue_SplitBeforeAppend(&rx_freePacketQueue,&((rx_ts_info_p)->_FPQ),p); \
+        (rx_ts_info_p)->_FPQ.len += i; \
+        rx_nFreePackets -= i; \
+        (rx_ts_info_p)->_FPQ.gtol_ops++; \
+        (rx_ts_info_p)->_FPQ.gtol_xfer += i; \
+    } while(0)
+/* same as above, except user has direct control over number to transfer */
+#define RX_TS_FPQ_GTOL2(rx_ts_info_p,num_transfer) \
+    do { \
+        register int i; \
+        register struct rx_packet * p; \
+        for (i=0,p=queue_First(&rx_freePacketQueue, rx_packet); \
+             i < (num_transfer); i++,p=queue_Next(p, rx_packet)); \
+        queue_SplitBeforeAppend(&rx_freePacketQueue,&((rx_ts_info_p)->_FPQ),p); \
+        (rx_ts_info_p)->_FPQ.len += i; \
+        rx_nFreePackets -= i; \
+        (rx_ts_info_p)->_FPQ.gtol_ops++; \
+        (rx_ts_info_p)->_FPQ.gtol_xfer += i; \
+    } while(0)
+/* checkout a packet from the thread-specific free packet queue */
+#define RX_TS_FPQ_CHECKOUT(rx_ts_info_p,p) \
+    do { \
+        (p) = queue_First(&((rx_ts_info_p)->_FPQ), rx_packet); \
+        queue_Remove(p); \
+        RX_FPQ_MARK_USED(p); \
+        (rx_ts_info_p)->_FPQ.len--; \
+        (rx_ts_info_p)->_FPQ.checkout_ops++; \
+        (rx_ts_info_p)->_FPQ.checkout_xfer++; \
+    } while(0)
+/* checkout multiple packets from the thread-specific free packet queue */
+#define RX_TS_FPQ_CHECKOUT2(rx_ts_info_p,num_transfer,q) \
+    do { \
+        register int i; \
+        register struct rx_packet *p; \
+        for (i=0, p=queue_First(&((rx_ts_info_p)->_FPQ), rx_packet); \
+             i < (num_transfer); \
+             i++, p=queue_Next(p, rx_packet)) { \
+            RX_FPQ_MARK_USED(p); \
+        } \
+        queue_SplitBeforeAppend(&((rx_ts_info_p)->_FPQ),(q),p); \
+        (rx_ts_info_p)->_FPQ.len -= (num_transfer); \
+        (rx_ts_info_p)->_FPQ.checkout_ops++; \
+        (rx_ts_info_p)->_FPQ.checkout_xfer += (num_transfer); \
+    } while(0)
+/* check a packet into the thread-specific free packet queue */
+#define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \
+    do { \
+        queue_Prepend(&((rx_ts_info_p)->_FPQ), (p)); \
+        RX_FPQ_MARK_FREE(p); \
+        (rx_ts_info_p)->_FPQ.len++; \
+        (rx_ts_info_p)->_FPQ.checkin_ops++; \
+        (rx_ts_info_p)->_FPQ.checkin_xfer++; \
+    } while(0)
+/* check multiple packets into the thread-specific free packet queue */
+/* num_transfer must equal length of (q); it is not a means of checking 
+ * in part of (q).  passing num_transfer just saves us instructions 
+ * since caller already knows length of (q) for other reasons */
+#define RX_TS_FPQ_CHECKIN2(rx_ts_info_p,num_transfer,q) \
+    do { \
+        register struct rx_packet *p, *np; \
+        for (queue_Scan((q), p, np, rx_packet)) { \
+            RX_FPQ_MARK_FREE(p); \
+        } \
+        queue_SplicePrepend(&((rx_ts_info_p)->_FPQ),(q)); \
+        (rx_ts_info_p)->_FPQ.len += (num_transfer); \
+        (rx_ts_info_p)->_FPQ.checkin_ops++; \
+        (rx_ts_info_p)->_FPQ.checkin_xfer += (num_transfer); \
+    } while(0)
+#endif /* AFS_PTHREAD_ENV */
 
 /* Number of free packets */
 EXT int rx_nFreePackets INIT(0);
@@ -196,9 +432,9 @@ EXT u_short rx_port;
 #if !defined(KERNEL) && !defined(AFS_PTHREAD_ENV)
 /* 32-bit select Mask for rx_Listener. */
 EXT fd_set rx_selectMask;
-EXT int rx_maxSocketNumber;    /* Maximum socket number in the select mask. */
+EXT osi_socket rx_maxSocketNumber;     /* Maximum socket number in the select mask. */
 /* Minumum socket number in the select mask. */
-EXT int rx_minSocketNumber INIT(0x7fffffff);
+EXT osi_socket rx_minSocketNumber INIT(0x7fffffff);
 #endif
 
 /* This is actually the minimum number of packets that must remain free,
@@ -249,16 +485,17 @@ EXT struct rx_stats rx_stats;
 EXT struct rx_peer **rx_peerHashTable;
 EXT struct rx_connection **rx_connHashTable;
 EXT struct rx_connection *rx_connCleanup_list INIT(0);
-EXT afs_uint32 rx_hashTableSize INIT(256);     /* Power of 2 */
-EXT afs_uint32 rx_hashTableMask INIT(255);     /* One less than rx_hashTableSize */
+EXT afs_uint32 rx_hashTableSize INIT(257);     /* Prime number */
 #ifdef RX_ENABLE_LOCKS
 EXT afs_kmutex_t rx_peerHashTable_lock;
 EXT afs_kmutex_t rx_connHashTable_lock;
 #endif /* RX_ENABLE_LOCKS */
 
-#define CONN_HASH(host, port, cid, epoch, type) ((((cid)>>RX_CIDSHIFT)&rx_hashTableMask))
+#define CONN_HASH(host, port, cid, epoch, type) ((((cid)>>RX_CIDSHIFT)%rx_hashTableSize))
 
-#define PEER_HASH(host, port)  ((host ^ port) & rx_hashTableMask)
+#if 0
+#define PEER_HASH(host, port)  ((host ^ port) % rx_hashTableSize)
+#endif
 
 /* Forward definitions of internal procedures */
 #define        rxi_ChallengeOff(conn)  rxevent_Cancel((conn)->challengeEvent, (struct rx_call*)0, 0);
@@ -279,7 +516,16 @@ EXT FILE *rx_debugFile;            /* Set by the user to a stdio file for debugging output
 EXT FILE *rxevent_debugFile;   /* Set to an stdio descriptor for event logging to that file */
 
 #define rx_Log rx_debugFile
+#ifdef AFS_NT40_ENV
+EXT int rxdebug_active;
+#if !defined(_WIN64)
+#define dpf(args) if (rxdebug_active) rxi_DebugPrint args;
+#else
+#define dpf(args)
+#endif
+#else
 #define dpf(args) if (rx_debugFile) rxi_DebugPrint args; else
+#endif
 #define rx_Log_event rxevent_debugFile
 
 EXT char *rx_packetTypes[RX_N_PACKET_TYPES] INIT(RX_PACKET_TYPES);     /* Strings defined in rx.h */
@@ -331,7 +577,7 @@ EXT int rxi_pthread_hinum INIT(0);
 EXT afs_kmutex_t rx_stats_mutex;       /* used to activate stats gathering */
 #endif
 
-EXT int rx_enable_stats INIT(0);
+EXT2 int rx_enable_stats INIT(0);
 
 /*
  * Set this flag to enable the listener thread to trade places with an idle
@@ -339,3 +585,5 @@ EXT int rx_enable_stats INIT(0);
  * the request path.
  */
 EXT int rx_enable_hot_thread INIT(0);
+
+#endif /* AFS_RX_GLOBALS_H */