rx: Make statistics interface use Atomics
[openafs.git] / src / rx / rx_lwp.c
index 9aa4ffc..c87ce23 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Copyright 2000, International Business Machines Corporation and others.
  * All Rights Reserved.
- * 
+ *
  * This software has been released under the terms of the IBM Public
  * License.  For details, see the LICENSE file in the top-level source
  * directory or online at http://www.openafs.org/dl/license10.html
  * Its value should be as large as the maximum file descriptor limit we
  * are likely to run into on any platform.  Right now, that is 65536
  * which is the default hard fd limit on Solaris 9 */
+#ifndef _WIN32
 #define FD_SETSIZE 65536
+#endif
 
 #include <afsconfig.h>
 #include <afs/param.h>
 
-RCSID
-    ("$Header$");
 
 # include <sys/types.h>                /* fd_set on older platforms */
 # include <errno.h>
@@ -42,15 +42,15 @@ RCSID
 # include <sys/ioctl.h>
 # include <sys/time.h>
 #endif
+# include <assert.h>
 # include "rx.h"
+# include "rx_atomic.h"
 # include "rx_globals.h"
+# include "rx_stats.h"
 # include <lwp.h>
 
 #define MAXTHREADNAMELENGTH 64
 
-extern int (*registerProgram) ();
-extern int (*swapNameProgram) ();
-
 int debugSelectFailure;                /* # of times select failed */
 
 /*
@@ -73,7 +73,7 @@ rxi_Wakeup(void *addr)
 }
 
 PROCESS rx_listenerPid = 0;    /* LWP process id of socket listener process */
-static void rx_ListenerProc(void *dummy);
+static void* rx_ListenerProc(void *dummy);
 
 /*
  * Delay the current thread the specified number of seconds.
@@ -117,14 +117,14 @@ rxi_InitializeThreadSupport(void)
 }
 
 void
-rxi_StartServerProc(void (*proc) (void), int stacksize)
+rxi_StartServerProc(void *(*proc) (void *), int stacksize)
 {
     PROCESS scratchPid;
     static int number = 0;
     char name[32];
 
     sprintf(name, "srv_%d", ++number);
-    LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, (void *)0,
+    LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
                      "rx_ServerProc", &scratchPid);
     if (registerProgram)
        (*registerProgram) (scratchPid, name);
@@ -136,7 +136,7 @@ rxi_StartListener(void)
     /* Priority of listener should be high, so it can keep conns alive */
 #define        RX_LIST_STACK   24000
     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
-                     (void *)0, "rx_Listener", &rx_listenerPid);
+                     NULL, "rx_Listener", &rx_listenerPid);
     if (registerProgram)
        (*registerProgram) (rx_listenerPid, "listener");
 }
@@ -163,8 +163,8 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
 {
     afs_uint32 host;
     u_short port;
-    register struct rx_packet *p = (struct rx_packet *)0;
-    int socket;
+    struct rx_packet *p = (struct rx_packet *)0;
+    osi_socket socket;
     struct clock cv;
     afs_int32 nextPollTime;    /* time to next poll FD before sleeping */
     int lastPollWorked, doingPoll;     /* true iff last poll was useful */
@@ -182,13 +182,16 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
     code = LWP_CurrentProcess(&pid);
     if (code) {
        fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
-       exit(1);
+       assert(0);
     }
     rx_listenerPid = pid;
     if (swapNameProgram)
        (*swapNameProgram) (pid, "listener", &name[0]);
 
     for (;;) {
+        /* See if a check for additional packets was issued */
+        rx_CheckPackets();
+
        /* Grab a new packet only if necessary (otherwise re-use the old one) */
        if (p) {
            rxi_RestoreDataBufs(p);
@@ -212,7 +215,8 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
            tv.tv_usec = cv.usec;
            tvp = &tv;
        }
-       rx_stats.selects++;
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.selects);
 
        *rfds = rx_selectMask;
 
@@ -222,10 +226,10 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
            nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
            tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
            tvp = &tv;
-           code = select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
+           code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
        } else {
            doingPoll = 0;
-           code = IOMGR_Select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
+           code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
        }
        lastPollWorked = 0;     /* default is that it didn't find anything */
 
@@ -281,7 +285,7 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
                            rxi_FreePacket(p);
                        }
                        if (swapNameProgram) {
-                           (*swapNameProgram) (rx_listenerPid, &name, 0);
+                           (*swapNameProgram) (rx_listenerPid, name, 0);
                            rx_listenerPid = 0;
                        }
                        return;
@@ -301,7 +305,7 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
                            rxi_FreePacket(p);
                        }
                        if (swapNameProgram) {
-                           (*swapNameProgram) (rx_listenerPid, &name, 0);
+                           (*swapNameProgram) (rx_listenerPid, name, 0);
                            rx_listenerPid = 0;
                        }
                        return;
@@ -318,11 +322,11 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
 /* This is the listener process request loop. The listener process loop
  * becomes a server thread when rxi_ListenerProc returns, and stays
  * server thread until rxi_ServerProc returns. */
-static void
+static void *
 rx_ListenerProc(void *dummy)
 {
     int threadID;
-    int sock;
+    osi_socket sock;
     struct rx_call *newcall;
     fd_set *rfds;
 
@@ -341,15 +345,16 @@ rx_ListenerProc(void *dummy)
        /* assert(sock != OSI_NULLSOCKET); */
     }
     /* not reached */
+    return NULL;
 }
 
 /* This is the server process request loop. The server process loop
  * becomes a listener thread when rxi_ServerProc returns, and stays
  * listener thread until rxi_ListenerProc returns. */
-void
-rx_ServerProc(void)
+void *
+rx_ServerProc(void * unused)
 {
-    int sock;
+    osi_socket sock;
     int threadID;
     struct rx_call *newcall = NULL;
     fd_set *rfds;
@@ -374,6 +379,7 @@ rx_ServerProc(void)
        /* assert(newcall != NULL); */
     }
     /* not reached */
+    return NULL;
 }
 
 /*
@@ -389,19 +395,11 @@ rxi_Listen(osi_socket sock)
      * Put the socket into non-blocking mode so that rx_Listener
      * can do a polling read before entering select
      */
-#ifndef AFS_DJGPP_ENV
     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
        perror("fcntl");
        (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
        return -1;
     }
-#else
-    if (__djgpp_set_socket_blocking_mode(sock, 1) < 0) {
-       perror("__djgpp_set_socket_blocking_mode");
-       (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
-       return -1;
-    }
-#endif /* AFS_DJGPP_ENV */
 
     if (sock > FD_SETSIZE - 1) {
        (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
@@ -422,9 +420,13 @@ rxi_Listen(osi_socket sock)
  * Recvmsg
  */
 int
-rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
+rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
-    return recvmsg((int)socket, msg_p, flags);
+#if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
+    while((rxi_HandleSocketError(socket)) > 0)
+       ;
+#endif
+    return recvmsg(socket, msg_p, flags);
 }
 
 /*
@@ -437,18 +439,23 @@ rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
     fd_set *sfds = (fd_set *) 0;
     while (sendmsg(socket, msg_p, flags) == -1) {
        int err;
-       rx_stats.sendSelects++;
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.sendSelects);
 
        if (!sfds) {
            if (!(sfds = IOMGR_AllocFDSet())) {
                (osi_Msg "rx failed to alloc fd_set: ");
                perror("rx_sendmsg");
-               return 3;
+               return -1;
            }
            FD_SET(socket, sfds);
        }
+#if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
+       while((rxi_HandleSocketError(socket)) > 0)
+         ;
+#endif
 #ifdef AFS_NT40_ENV
-       if (errno)
+       if (WSAGetLastError())
 #elif defined(AFS_LINUX22_ENV)
        /* linux unfortunately returns ECONNREFUSED if the target port
         * is no longer in use */
@@ -461,7 +468,14 @@ rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
        {
            (osi_Msg "rx failed to send packet: ");
            perror("rx_sendmsg");
-           return 3;
+#ifndef AFS_NT40_ENV
+            if (errno > 0)
+              return -errno;
+#else
+            if (WSAGetLastError() > 0)
+              return -WSAGetLastError();
+#endif
+           return -1;
        }
        while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
            if (err >= 0 || errno != EINTR)