rx: Process error queue after noticing errors
[openafs.git] / src / rx / rx_lwp.c
index 39d6fda..fa4ee6d 100644 (file)
@@ -1,57 +1,54 @@
 /*
-****************************************************************************
-*        Copyright IBM Corporation 1988, 1989 - All Rights Reserved        *
-*                                                                          *
-* Permission to use, copy, modify, and distribute this software and its    *
-* documentation for any purpose and without fee is hereby granted,         *
-* provided that the above copyright notice appear in all copies and        *
-* that both that copyright notice and this permission notice appear in     *
-* supporting documentation, and that the name of IBM not be used in        *
-* advertising or publicity pertaining to distribution of the software      *
-* without specific, written prior permission.                              *
-*                                                                          *
-* IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
-* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
-* BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY      *
-* DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER  *
-* IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING   *
-* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.    *
-****************************************************************************
-*/
+ * Copyright 2000, International Business Machines Corporation and others.
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ */
 
 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
 
-# include <afs/param.h>
-# include <sys/types.h>                /* fd_set on older platforms */
-# include <errno.h>
-# include <signal.h>
-#ifdef AFS_NT40_ENV
-# include <winsock2.h>
-#else
-# include <unistd.h>           /* select() prototype */
-# include <sys/time.h>         /* struct timeval, select() prototype */
-# ifndef FD_SET
-#  include <sys/select.h>      /* fd_set on newer platforms */
-# endif
-# include <sys/socket.h>
+/* This controls the size of an fd_set; it must be defined early before
+ * the system headers define that type and the macros that operate on it.
+ * Its value should be as large as the maximum file descriptor limit we
+ * are likely to run into on any platform.  Right now, that is 65536
+ * which is the default hard fd limit on Solaris 9 */
+#ifndef _WIN32
+#define FD_SETSIZE 65536
+#endif
+
+#include <afsconfig.h>
+#include <afs/param.h>
+
+#include <roken.h>
+
+#ifdef HAVE_SYS_FILE_H
 # include <sys/file.h>
-# include <netdb.h>
-# include <sys/stat.h>
-# include <netinet/in.h>
-# include <net/if.h>
-# include <sys/ioctl.h>
-# include <sys/time.h>
 #endif
-# include "rx.h"
-# include "rx_globals.h"
-# include <lwp.h>
 
+#ifndef AFS_PTHREAD_ENV
+
+#include <lwp.h>
+
+#include "rx.h"
+#include "rx_atomic.h"
+#include "rx_globals.h"
+#include "rx_internal.h"
+#include "rx_stats.h"
+#ifdef AFS_NT40_ENV
+#include "rx_xmit_nt.h"
+#endif
+
+#define MAXTHREADNAMELENGTH 64
+
+int debugSelectFailure;                /* # of times select failed */
 
-int debugSelectFailure;        /* # of times select failed */
 /*
  * Sleep on the unique wait channel provided.
  */
-void rxi_Sleep(void *addr)
+void
+rxi_Sleep(void *addr)
 {
     LWP_WaitProcess(addr);
 }
@@ -60,32 +57,48 @@ void rxi_Sleep(void *addr)
  * Wakeup any threads on the channel provided.
  * They may be woken up spuriously, and must check any conditions.
  */
-void rxi_Wakeup(void *addr)
+void
+rxi_Wakeup(void *addr)
 {
     LWP_NoYieldSignal(addr);
 }
 
-PROCESS rx_listenerPid;        /* LWP process id of socket listener process */
-void rx_ListenerProc();
+PROCESS rx_listenerPid = 0;    /* LWP process id of socket listener process */
+static void* rx_ListenerProc(void *dummy);
 
 /*
  * Delay the current thread the specified number of seconds.
  */
-void rxi_Delay(int sec)
+void
+rxi_Delay(int sec)
 {
     IOMGR_Sleep(sec);
 }
 
+static int quitListening = 0;
+
+/* This routine will kill the listener thread, if it exists. */
+void
+rxi_StopListener(void)
+{
+    quitListening = 1;
+    rxi_ReScheduleEvents();
+}
+
 /* This routine will get called by the event package whenever a new,
    earlier than others, event is posted.  If the Listener process
    is blocked in selects, this will unblock it.  It also can be called
    to force a new trip through the rxi_Listener select loop when the set
    of file descriptors it should be listening to changes... */
-void rxi_ReScheduleEvents() {
-    if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
+void
+rxi_ReScheduleEvents(void)
+{
+    if (rx_listenerPid)
+       IOMGR_Cancel(rx_listenerPid);
 }
 
-void rxi_InitializeThreadSupport(void)
+void
+rxi_InitializeThreadSupport(void)
 {
     PROCESS junk;
 
@@ -94,19 +107,29 @@ void rxi_InitializeThreadSupport(void)
     FD_ZERO(&rx_selectMask);
 }
 
-void rxi_StartServerProc(proc, stacksize)
-    long (*proc)();
+void
+rxi_StartServerProc(void *(*proc) (void *), int stacksize)
 {
     PROCESS scratchPid;
-    LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
-                     0, "rx_ServerProc", &scratchPid);
+    static int number = 0;
+    char name[32];
+
+    sprintf(name, "srv_%d", ++number);
+    LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
+                     "rx_ServerProc", &scratchPid);
+    if (registerProgram)
+       (*registerProgram) (scratchPid, name);
 }
 
-void rxi_StartListener() {
+void
+rxi_StartListener(void)
+{
     /* Priority of listener should be high, so it can keep conns alive */
 #define        RX_LIST_STACK   24000
-    LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
-                     "rx_Listener", &rx_listenerPid);
+    LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
+                     NULL, "rx_Listener", &rx_listenerPid);
+    if (registerProgram)
+       (*registerProgram) (rx_listenerPid, "listener");
 }
 
 /* The main loop which listens to the net for datagrams, and handles timeouts
@@ -126,50 +149,64 @@ void rxi_StartListener() {
    don't do a polling select again until several seconds later (via nextPollTime mechanism).
    */
 
-void rxi_ListenerProc(rfds, tnop, newcallp)
-    fd_set *rfds;
-    int *tnop;
-    struct rx_call **newcallp;
+static void
+rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
 {
     afs_uint32 host;
     u_short port;
-    register struct rx_packet *p = (struct rx_packet *)0;
-    int socket;
+    struct rx_packet *p = (struct rx_packet *)0;
+    osi_socket socket;
     struct clock cv;
-    afs_int32 nextPollTime;            /* time to next poll FD before sleeping */
+    afs_int32 nextPollTime;    /* time to next poll FD before sleeping */
     int lastPollWorked, doingPoll;     /* true iff last poll was useful */
     struct timeval tv, *tvp;
-    int i, code;
+    int code;
+#ifdef AFS_NT40_ENV
+    int i;
+#endif
+    PROCESS pid;
+    char name[MAXTHREADNAMELENGTH] = "srv_0";
 
     clock_NewTime();
     lastPollWorked = 0;
     nextPollTime = 0;
+    code = LWP_CurrentProcess(&pid);
+    if (code) {
+       osi_Panic("rxi_Listener: Can't get my pid.\n");
+    }
+    rx_listenerPid = pid;
+    if (swapNameProgram)
+       (*swapNameProgram) (pid, "listener", &name[0]);
 
     for (;;) {
+        /* See if a check for additional packets was issued */
+        rx_CheckPackets();
+
        /* Grab a new packet only if necessary (otherwise re-use the old one) */
        if (p) {
            rxi_RestoreDataBufs(p);
-       }
-       else {
+       } else {
            if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
-               osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
+               osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
        }
        /* Wait for the next event time or a packet to arrive. */
        /* event_RaiseEvents schedules any events whose time has come and
-          then atomically computes the time to the next event, guaranteeing
-          that this is positive.  If there is no next event, it returns 0 */
+        * then atomically computes the time to the next event, guaranteeing
+        * that this is positive.  If there is no next event, it returns 0 */
        clock_NewTime();
-       if (!rxevent_RaiseEvents(&cv)) tvp = (struct timeval *) 0;
+       if (!rxevent_RaiseEvents(&cv))
+           tvp = NULL;
        else {
            /* It's important to copy cv to tv, because the 4.3 documentation
-              for select threatens that *tv may be updated after a select, in
-              future editions of the system, to indicate how much of the time
-              period has elapsed.  So we shouldn't rely on tv not being altered. */
-           tv.tv_sec = cv.sec; /* Time to next event */
+            * for select threatens that *tv may be updated after a select, in
+            * future editions of the system, to indicate how much of the time
+            * period has elapsed.  So we shouldn't rely on tv not being altered. */
+           tv.tv_sec = cv.sec; /* Time to next event */
            tv.tv_usec = cv.usec;
            tvp = &tv;
-       }       
-       rx_stats.selects++;
+       }
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.selects);
 
        *rfds = rx_selectMask;
 
@@ -177,83 +214,96 @@ void rxi_ListenerProc(rfds, tnop, newcallp)
            /* we're catching up, or haven't tried to for a few seconds */
            doingPoll = 1;
            nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
-           tv.tv_sec = tv.tv_usec = 0;         /* make sure we poll */
+           tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
            tvp = &tv;
-           code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
-       }
-       else {
+           code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
+       } else {
            doingPoll = 0;
-           code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
+           code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
        }
        lastPollWorked = 0;     /* default is that it didn't find anything */
 
-       switch(code) {
-           case 0: 
-                /* Timer interrupt:
-                * If it was a timer interrupt then we can assume that
-                 * the time has advanced by roughly the value of the
-                 * previous timeout, and that there is now at least
-                 * one pending event.
-                 */
-               clock_NewTime();
-               break;
-           case -1: 
-                /* select or IOMGR_Select returned failure */
-               debugSelectFailure++;   /* update debugging counter */
-               clock_NewTime();
-               break;
-           case -2:
-                /* IOMGR_Cancel:
-                * IOMGR_Cancel is invoked whenever a new event is
-                 * posted that is earlier than any existing events.
-                 * So we re-evaluate the time, and then go back to
-                 * reschedule events
-                 */
-               clock_NewTime();
-               break;
-
-           default: 
-               /* Packets have arrived, presumably:
-                * If it wasn't a timer interrupt, then no event should have
-                * timed out yet (well some event may have, but only just...), so
-                * we don't bother looking to see if any have timed out, but just
-                * go directly to reading the data packets
-                 */
-               clock_NewTime();
-               if (doingPoll) lastPollWorked = 1;
+       if (quitListening) {
+           quitListening = 0;
+           LWP_DestroyProcess(pid);
+       }
+
+       switch (code) {
+       case 0:
+           /* Timer interrupt:
+            * If it was a timer interrupt then we can assume that
+            * the time has advanced by roughly the value of the
+            * previous timeout, and that there is now at least
+            * one pending event.
+            */
+           clock_NewTime();
+           break;
+       case -1:
+           /* select or IOMGR_Select returned failure */
+           debugSelectFailure++;       /* update debugging counter */
+           clock_NewTime();
+           break;
+       case -2:
+           /* IOMGR_Cancel:
+            * IOMGR_Cancel is invoked whenever a new event is
+            * posted that is earlier than any existing events.
+            * So we re-evaluate the time, and then go back to
+            * reschedule events
+            */
+           clock_NewTime();
+           break;
+
+       default:
+           /* Packets have arrived, presumably:
+            * If it wasn't a timer interrupt, then no event should have
+            * timed out yet (well some event may have, but only just...), so
+            * we don't bother looking to see if any have timed out, but just
+            * go directly to reading the data packets
+            */
+           clock_NewTime();
+           if (doingPoll)
+               lastPollWorked = 1;
 #ifdef AFS_NT40_ENV
-               for (i=0; p && i<rfds->fd_count; i++) {
-                   socket = rfds->fd_array[i];
-                   if (rxi_ReadPacket(socket, p, &host, &port)) {
-                       *newcallp = NULL;
-                       p = rxi_ReceivePacket(p, socket, host, port,
-                                             tnop, newcallp);
-                       if (newcallp && *newcallp) {
-                           if (p) {
-                               rxi_FreePacket(p);
-                           }
-                           return;
+           for (i = 0; p && i < rfds->fd_count; i++) {
+               socket = rfds->fd_array[i];
+               if (rxi_ReadPacket(socket, p, &host, &port)) {
+                   *newcallp = NULL;
+                   p = rxi_ReceivePacket(p, socket, host, port, tnop,
+                                         newcallp);
+                   if (newcallp && *newcallp) {
+                       if (p) {
+                           rxi_FreePacket(p);
                        }
+                       if (swapNameProgram) {
+                           (*swapNameProgram) (rx_listenerPid, name, 0);
+                           rx_listenerPid = 0;
+                       }
+                       return;
                    }
                }
+           }
 #else
-               for (socket = rx_minSocketNumber;
-                    p && socket <= rx_maxSocketNumber; socket++)  {
-                    if (!FD_ISSET(socket, rfds))
-                        continue;
-                    if (rxi_ReadPacket(socket, p, &host, &port)) {
-                        p = rxi_ReceivePacket(p, socket, host, port,
-                                              tnop, newcallp);
-                       if (newcallp && *newcallp) {
-                           if (p) {
-                               rxi_FreePacket(p);
-                           }
-                           return;
+           for (socket = rx_minSocketNumber;
+                p && socket <= rx_maxSocketNumber; socket++) {
+               if (!FD_ISSET(socket, rfds))
+                   continue;
+               if (rxi_ReadPacket(socket, p, &host, &port)) {
+                   p = rxi_ReceivePacket(p, socket, host, port, tnop,
+                                         newcallp);
+                   if (newcallp && *newcallp) {
+                       if (p) {
+                           rxi_FreePacket(p);
+                       }
+                       if (swapNameProgram) {
+                           (*swapNameProgram) (rx_listenerPid, name, 0);
+                           rx_listenerPid = 0;
                        }
+                       return;
                    }
                }
+           }
 #endif
-               break;
+           break;
        }
     }
     /* NOTREACHED */
@@ -262,10 +312,11 @@ void rxi_ListenerProc(rfds, tnop, newcallp)
 /* This is the listener process request loop. The listener process loop
  * becomes a server thread when rxi_ListenerProc returns, and stays
  * server thread until rxi_ServerProc returns. */
-static void rx_ListenerProc()
+static void *
+rx_ListenerProc(void *dummy)
 {
     int threadID;
-    int sock;
+    osi_socket sock;
     struct rx_call *newcall;
     fd_set *rfds;
 
@@ -273,25 +324,27 @@ static void rx_ListenerProc()
        osi_Panic("rx_ListenerProc: no fd_sets!\n");
     }
 
-    while(1) {
+    while (1) {
        newcall = NULL;
        threadID = -1;
        rxi_ListenerProc(rfds, &threadID, &newcall);
-       /* assert(threadID != -1); */
-       /* assert(newcall != NULL); */
+       /* osi_Assert(threadID != -1); */
+       /* osi_Assert(newcall != NULL); */
        sock = OSI_NULLSOCKET;
        rxi_ServerProc(threadID, newcall, &sock);
-       /* assert(sock != OSI_NULLSOCKET); */
+       /* osi_Assert(sock != OSI_NULLSOCKET); */
     }
     /* not reached */
+    return NULL;
 }
 
 /* This is the server process request loop. The server process loop
  * becomes a listener thread when rxi_ServerProc returns, and stays
  * listener thread until rxi_ListenerProc returns. */
-void rx_ServerProc()
+void *
+rx_ServerProc(void * unused)
 {
-    int sock;
+    osi_socket sock;
     int threadID;
     struct rx_call *newcall = NULL;
     fd_set *rfds;
@@ -300,22 +353,23 @@ void rx_ServerProc()
        osi_Panic("rxi_ListenerProc: no fd_sets!\n");
     }
 
-    rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
+    rxi_MorePackets(rx_maxReceiveWindow + 2);  /* alloc more packets */
     rxi_dataQuota += rx_initSendWindow;        /* Reserve some pkts for hard times */
     /* threadID is used for making decisions in GetCall.  Get it by bumping
      * number of threads handling incoming calls */
     threadID = rxi_availProcs++;
 
-    while(1) {
+    while (1) {
        sock = OSI_NULLSOCKET;
        rxi_ServerProc(threadID, newcall, &sock);
-       /* assert(sock != OSI_NULLSOCKET); */
+       /* osi_Assert(sock != OSI_NULLSOCKET); */
        newcall = NULL;
        rxi_ListenerProc(rfds, &threadID, &newcall);
-       /* assert(threadID != -1); */
-       /* assert(newcall != NULL); */
+       /* osi_Assert(threadID != -1); */
+       /* osi_Assert(newcall != NULL); */
     }
     /* not reached */
+    return NULL;
 }
 
 /*
@@ -323,8 +377,8 @@ void rx_ServerProc()
  * Called from a single thread at startup.
  * Returns 0 on success; -1 on failure.
  */
-int rxi_Listen(sock)
-    osi_socket sock;
+int
+rxi_Listen(osi_socket sock)
 {
 #ifndef AFS_NT40_ENV
     /*
@@ -337,66 +391,105 @@ int rxi_Listen(sock)
        return -1;
     }
 
-    if (sock > FD_SETSIZE-1) {
+    if (sock > FD_SETSIZE - 1) {
        (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
-        FD_SETSIZE-1);
+        FD_SETSIZE - 1);
        return -1;
     }
 #endif
 
     FD_SET(sock, &rx_selectMask);
-    if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
-    if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
+    if (sock > rx_maxSocketNumber)
+       rx_maxSocketNumber = sock;
+    if (sock < rx_minSocketNumber)
+       rx_minSocketNumber = sock;
     return 0;
 }
 
 /*
  * Recvmsg
  */
-int rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
+int
+rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
-    return recvmsg((int) socket, msg_p, flags);
+    int code;
+    code = recvmsg(socket, msg_p, flags);
+
+#ifdef AFS_RXERRQ_ENV
+    if (code < 0) {
+       while((rxi_HandleSocketError(socket)) > 0)
+           ;
+    }
+#endif
+
+    return code;
 }
 
 /*
  * Simulate a blocking sendmsg on the non-blocking socket.
  * It's non blocking because it was set that way for recvmsg.
  */
-int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
+int
+rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
-    fd_set *sfds = (fd_set*)0;
+    fd_set *sfds = (fd_set *) 0;
     while (sendmsg(socket, msg_p, flags) == -1) {
        int err;
-       rx_stats.sendSelects++;
+
+#ifdef AFS_NT40_ENV
+       err = WSAGetLastError();
+#else
+       err = errno;
+#endif
+
+       if (rx_stats_active)
+           rx_atomic_inc(&rx_stats.sendSelects);
 
        if (!sfds) {
-           if ( !(sfds = IOMGR_AllocFDSet())) {
+           if (!(sfds = IOMGR_AllocFDSet())) {
                (osi_Msg "rx failed to alloc fd_set: ");
                perror("rx_sendmsg");
-               return 3;
+               return -1;
            }
            FD_SET(socket, sfds);
        }
-           
+#ifdef AFS_RXERRQ_ENV
+       while((rxi_HandleSocketError(socket)) > 0)
+         ;
+#endif
 #ifdef AFS_NT40_ENV
-       if (errno)
+       if (err)
 #elif defined(AFS_LINUX22_ENV)
-         /* linux unfortunately returns ECONNREFUSED if the target port
-          * is no longer in use */
-       if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED)
+       /* linux unfortunately returns ECONNREFUSED if the target port
+        * is no longer in use */
+       /* and EAGAIN if a UDP checksum is incorrect */
+       if (err != EWOULDBLOCK && err != ENOBUFS && err != ECONNREFUSED
+           && err != EAGAIN)
 #else
-       if (errno != EWOULDBLOCK && errno != ENOBUFS)
+       if (err != EWOULDBLOCK && err != ENOBUFS)
 #endif
        {
            (osi_Msg "rx failed to send packet: ");
            perror("rx_sendmsg");
-           return 3;
+            if (err > 0)
+              return -err;
+           return -1;
        }
-       while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
-           if (err >= 0 || errno != EINTR) 
-             osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
+       while ((err = select(
+#ifdef AFS_NT40_ENV
+                             0,
+#else
+                             socket + 1,
+#endif
+                             0, sfds, 0, 0)) != 1) {
+           if (err >= 0 || errno != EINTR)
+               osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
+           FD_ZERO(sfds);
+           FD_SET(socket, sfds);
        }
     }
     if (sfds)
        IOMGR_FreeFDSet(sfds);
+    return 0;
 }
+#endif