rx: For AFS_RXERRQ_ENV, retry sendmsg on error
[openafs.git] / src / rx / rx_pthread.c
index 4efa7f4..2553988 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Copyright 2000, International Business Machines Corporation and others.
  * All Rights Reserved.
- * 
+ *
  * This software has been released under the terms of the IBM Public
  * License.  For details, see the LICENSE file in the top-level source
  * directory or online at http://www.openafs.org/dl/license10.html
 #include <afsconfig.h>
 #include <afs/param.h>
 
+#include <roken.h>
+#include <afs/opr.h>
 
-#include <sys/types.h>
-#include <errno.h>
-#include <signal.h>
-#include <string.h>
-#ifdef HAVE_STDINT_H
-# include <stdint.h>
-#endif
-#ifndef AFS_NT40_ENV
-# include <sys/socket.h>
-# include <sys/file.h>
-# include <netdb.h>
-# include <netinet/in.h>
-# include <net/if.h>
-# include <sys/ioctl.h>
-# include <sys/time.h>
-# include <unistd.h>
-#endif
-#include <sys/stat.h>
-#include <rx/rx.h>
-#include <rx/rx_globals.h>
 #include <assert.h>
-#include <rx/rx_pthread.h>
-#include <rx/rx_clock.h>
+
+#ifdef AFS_PTHREAD_ENV
+
+#include "rx.h"
+#include "rx_globals.h"
+#include "rx_pthread.h"
+#include "rx_clock.h"
+#include "rx_atomic.h"
+#include "rx_internal.h"
+#include "rx_pthread.h"
+#ifdef AFS_NT40_ENV
+#include "rx_xmit_nt.h"
+#endif
+
+static void rxi_SetThreadNum(int threadID);
 
 /* Set rx_pthread_event_rescheduled if event_handler should just try
  * again instead of sleeping.
@@ -67,6 +62,13 @@ static int listeners_started = 0;
 afs_kmutex_t rx_clock_mutex;
 struct clock rxi_clockNow;
 
+static rx_atomic_t threadHiNum;
+
+int
+rx_NewThreadId(void) {
+    return rx_atomic_inc_and_read(&threadHiNum);
+}
+
 /*
  * Delay the current thread the specified number of seconds.
  */
@@ -108,14 +110,11 @@ rxi_StartServerProc(void *(*proc) (void *), int stacksize)
     AFS_SIGSET_DECL;
 
     if (pthread_attr_init(&tattr) != 0) {
-       dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
-       assert(0);
+       osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
     }
 
     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
-       dpf
-           (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
-       assert(0);
+       osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
     }
 
     /*
@@ -123,8 +122,7 @@ rxi_StartServerProc(void *(*proc) (void *), int stacksize)
      */
     AFS_SIGSET_CLEAR();
     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
-       dpf(("Unable to Create Rx server thread\n"));
-       assert(0);
+       osi_Panic("Unable to Create Rx server thread\n");
     }
     AFS_SIGSET_RESTORE();
 }
@@ -168,8 +166,8 @@ event_handler(void *argp)
        error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
         if (error == 0) {
            rx_pthread_n_event_woken++;
-        } 
-#ifdef AFS_NT40_ENV        
+        }
+#ifdef AFS_NT40_ENV
         else if (error == ETIMEDOUT) {
            rx_pthread_n_event_expired++;
        } else {
@@ -184,7 +182,7 @@ event_handler(void *argp)
 #endif
        rx_pthread_event_rescheduled = 0;
     }
-    return NULL;
+    AFS_UNREACHED(return(NULL));
 }
 
 
@@ -210,6 +208,12 @@ rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
     u_short port;
     struct rx_packet *p = (struct rx_packet *)0;
 
+    if (!(rx_enable_hot_thread && newcallp)) {
+       /* Don't do this for hot threads, since we might stop being the
+        * listener. */
+       opr_threadname_set("rx_Listener");
+    }
+
     MUTEX_ENTER(&listener_mutex);
     while (!listeners_started) {
        CV_WAIT(&rx_listener_cond, &listener_mutex);
@@ -228,8 +232,7 @@ rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
        } else {
            if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
                /* Could this happen with multiple socket listeners? */
-               dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
-               assert(0);
+               osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
            }
        }
 
@@ -260,15 +263,14 @@ rx_ListenerProc(void *argp)
        newcall = NULL;
        threadID = -1;
        rxi_ListenerProc(sock, &threadID, &newcall);
-       /* assert(threadID != -1); */
-       /* assert(newcall != NULL); */
+       /* osi_Assert(threadID != -1); */
+       /* osi_Assert(newcall != NULL); */
        sock = OSI_NULLSOCKET;
-       assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
+       rxi_SetThreadNum(threadID);
        rxi_ServerProc(threadID, newcall, &sock);
-       /* assert(sock != OSI_NULLSOCKET); */
+       /* osi_Assert(sock != OSI_NULLSOCKET); */
     }
-    /* not reached */
-    return NULL;
+    AFS_UNREACHED(return(NULL));
 }
 
 /* This is the server process request loop. The server process loop
@@ -287,7 +289,7 @@ rx_ServerProc(void * dummy)
     /* threadID is used for making decisions in GetCall.  Get it by bumping
      * number of threads handling incoming calls */
     /* Unique thread ID: used for scheduling purposes *and* as index into
-     * the host hold table (fileserver). 
+     * the host hold table (fileserver).
      * The previously used rxi_availProcs is unsuitable as it
      * will already go up and down as packets arrive while the server
      * threads are still initialising! The recently introduced
@@ -298,7 +300,7 @@ rx_ServerProc(void * dummy)
      * thread... chose the latter.
      */
     MUTEX_ENTER(&rx_pthread_mutex);
-    threadID = ++rxi_pthread_hinum;
+    threadID = rx_NewThreadId();
     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
        rxi_fcfs_thread_num = threadID;
     MUTEX_EXIT(&rx_pthread_mutex);
@@ -307,16 +309,15 @@ rx_ServerProc(void * dummy)
 
     while (1) {
        sock = OSI_NULLSOCKET;
-       assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
+       rxi_SetThreadNum(threadID);
        rxi_ServerProc(threadID, newcall, &sock);
-       /* assert(sock != OSI_NULLSOCKET); */
+       /* osi_Assert(sock != OSI_NULLSOCKET); */
        newcall = NULL;
        rxi_ListenerProc(sock, &threadID, &newcall);
-       /* assert(threadID != -1); */
-       /* assert(newcall != NULL); */
+       /* osi_Assert(threadID != -1); */
+       /* osi_Assert(newcall != NULL); */
     }
-    /* not reached */
-    return NULL;
+    AFS_UNREACHED(return(NULL));
 }
 
 /*
@@ -324,7 +325,7 @@ rx_ServerProc(void * dummy)
  * listener processes (one for each socket); these are started by GetUdpSocket.
  *
  * The event handling process *is* started here (the old listener used
- * to also handle events). The listener threads can't actually start 
+ * to also handle events). The listener threads can't actually start
  * listening until rxi_StartListener is called because most of R may not
  * be initialized when rxi_Listen is called.
  */
@@ -338,26 +339,19 @@ rxi_StartListener(void)
                return;
 
     if (pthread_attr_init(&tattr) != 0) {
-       dpf
-           (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
-       assert(0);
+       osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
     }
 
     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
-       dpf
-           (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
-       assert(0);
+       osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
     }
 
     AFS_SIGSET_CLEAR();
     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
        0) {
-       dpf(("Unable to create Rx event handling thread\n"));
-       assert(0);
+       osi_Panic("Unable to create Rx event handling thread\n");
     }
-    MUTEX_ENTER(&rx_pthread_mutex);
-    ++rxi_pthread_hinum;
-    MUTEX_EXIT(&rx_pthread_mutex);
+    rx_NewThreadId();
     AFS_SIGSET_RESTORE();
 
     MUTEX_ENTER(&listener_mutex);
@@ -378,25 +372,18 @@ rxi_Listen(osi_socket sock)
     AFS_SIGSET_DECL;
 
     if (pthread_attr_init(&tattr) != 0) {
-       dpf
-           (("Unable to create socket listener thread (pthread_attr_init)\n"));
-       assert(0);
+       osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
     }
 
     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
-       dpf
-           (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
-       assert(0);
+       osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
     }
 
     AFS_SIGSET_CLEAR();
     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
-       dpf(("Unable to create socket listener thread\n"));
-       assert(0);
+       osi_Panic("Unable to create socket listener thread\n");
     }
-    MUTEX_ENTER(&rx_pthread_mutex);
-    ++rxi_pthread_hinum;
-    MUTEX_EXIT(&rx_pthread_mutex);
+    rx_NewThreadId();
     AFS_SIGSET_RESTORE();
     return 0;
 }
@@ -410,11 +397,12 @@ int
 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
     int ret;
-#if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
-    while((rxi_HandleSocketError(socket)) > 0)
-      ;
-#endif
     ret = recvmsg(socket, msg_p, flags);
+
+    if (ret < 0) {
+       rxi_HandleSocketErrors(socket);
+    }
+
     return ret;
 }
 
@@ -424,37 +412,42 @@ rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 int
 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
-    int ret;
-    ret = sendmsg(socket, msg_p, flags);
-#ifdef AFS_LINUX22_ENV
+    int err;
+    if (sendmsg(socket, msg_p, flags) >= 0) {
+       return 0;
+    }
+
+#ifdef AFS_NT40_ENV
+    err = WSAGetLastError();
+#else
+    err = errno;
+#endif
+
+#ifndef AFS_RXERRQ_ENV
+# ifdef AFS_LINUX22_ENV
     /* linux unfortunately returns ECONNREFUSED if the target port
      * is no longer in use */
     /* and EAGAIN if a UDP checksum is incorrect */
-    if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
-#else
-    if (ret == -1) {
-#endif
-       dpf(("rxi_sendmsg failed, error %d\n", errno));
-       fflush(stdout);
-#ifndef AFS_NT40_ENV
-        if (errno > 0)
-          return -errno;
-#else
-            if (WSAGetLastError() > 0)
-              return -WSAGetLastError();
-#endif
-       return -1;
+    if (err == ECONNREFUSED || err == EAGAIN) {
+       return 0;
     }
-    return 0;
+# endif
+    dpf(("rxi_sendmsg failed, error %d\n", errno));
+    fflush(stdout);
+#endif /* !AFS_RXERRQ_ENV */
+
+    if (err > 0) {
+       return -err;
+    }
+    return -1;
 }
 
 struct rx_ts_info_t * rx_ts_info_init(void) {
     struct rx_ts_info_t * rx_ts_info;
-    rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
-    assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
-    memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
+    rx_ts_info = calloc(1, sizeof(rx_ts_info_t));
+    osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
 #ifdef RX_ENABLE_TSFPQ
-    queue_Init(&rx_ts_info->_FPQ);
+    opr_queue_Init(&rx_ts_info->_FPQ.queue);
 
     MUTEX_ENTER(&rx_packets_mutex);
     rx_TSFPQMaxProcs++;
@@ -463,3 +456,25 @@ struct rx_ts_info_t * rx_ts_info_init(void) {
 #endif /* RX_ENABLE_TSFPQ */
     return rx_ts_info;
 }
+
+int
+rx_GetThreadNum(void) {
+    return (intptr_t)pthread_getspecific(rx_thread_id_key);
+}
+
+static void
+rxi_SetThreadNum(int threadID) {
+    osi_Assert(pthread_setspecific(rx_thread_id_key,
+                                  (void *)(intptr_t)threadID) == 0);
+}
+
+int
+rx_SetThreadNum(void) {
+    int threadId;
+
+    threadId = rx_NewThreadId();
+    rxi_SetThreadNum(threadId);
+    return threadId;
+}
+
+#endif