rx: For AFS_RXERRQ_ENV, retry sendmsg on error
[openafs.git] / src / rx / rx_pthread.c
index 17f8881..2553988 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Copyright 2000, International Business Machines Corporation and others.
  * All Rights Reserved.
- * 
+ *
  * This software has been released under the terms of the IBM Public
  * License.  For details, see the LICENSE file in the top-level source
  * directory or online at http://www.openafs.org/dl/license10.html
 #include <afsconfig.h>
 #include <afs/param.h>
 
-RCSID("$Header$");
-
-#include <sys/types.h>
-#include <errno.h>
-#include <signal.h>
-#ifndef AFS_NT40_ENV
-# include <sys/socket.h>
-# include <sys/file.h>
-# include <netdb.h>
-# include <netinet/in.h>
-# include <net/if.h>
-# include <sys/ioctl.h>
-# include <sys/time.h>
-#endif
-#include <sys/stat.h>
-#include <rx.h>
-#include <rx_globals.h>
+#include <roken.h>
+#include <afs/opr.h>
+
 #include <assert.h>
-#include <rx/rx_pthread.h>
 
-/*
- * Number of times the event handling thread was signalled because a new
- * event was scheduled earlier than the lastest event.
- *
- * Protected by event_handler_mutex
- */
-static long rx_pthread_n_event_wakeups;
+#ifdef AFS_PTHREAD_ENV
+
+#include "rx.h"
+#include "rx_globals.h"
+#include "rx_pthread.h"
+#include "rx_clock.h"
+#include "rx_atomic.h"
+#include "rx_internal.h"
+#include "rx_pthread.h"
+#ifdef AFS_NT40_ENV
+#include "rx_xmit_nt.h"
+#endif
+
+static void rxi_SetThreadNum(int threadID);
 
 /* Set rx_pthread_event_rescheduled if event_handler should just try
  * again instead of sleeping.
@@ -62,19 +54,26 @@ static void *rx_ListenerProc(void *);
  * This thread is also responsible for keeping time.
  */
 static pthread_t event_handler_thread;
-pthread_cond_t rx_event_handler_cond;
-pthread_mutex_t event_handler_mutex;
-pthread_cond_t rx_listener_cond;
-pthread_mutex_t listener_mutex;
+afs_kcondvar_t rx_event_handler_cond;
+afs_kmutex_t event_handler_mutex;
+afs_kcondvar_t rx_listener_cond;
+afs_kmutex_t listener_mutex;
 static int listeners_started = 0;
-pthread_mutex_t rx_clock_mutex;
+afs_kmutex_t rx_clock_mutex;
 struct clock rxi_clockNow;
 
+static rx_atomic_t threadHiNum;
+
+int
+rx_NewThreadId(void) {
+    return rx_atomic_inc_and_read(&threadHiNum);
+}
+
 /*
  * Delay the current thread the specified number of seconds.
  */
-void rxi_Delay(sec)
-    int sec;
+void
+rxi_Delay(int sec)
 {
     sleep(sec);
 }
@@ -82,56 +81,48 @@ void rxi_Delay(sec)
 /*
  * Called from rx_Init()
  */
-void rxi_InitializeThreadSupport() {
-
-    listeners_started = 0;
-    gettimeofday((struct timeval *)&rxi_clockNow, NULL);
+void
+rxi_InitializeThreadSupport(void)
+{
+       /* listeners_started must only be reset if
+        * the listener thread terminates */
+       /* listeners_started = 0; */
+    clock_GetTime(&rxi_clockNow);
 }
 
-static void *server_entry(void * argp)
+static void *
+server_entry(void *argp)
 {
-    void (*server_proc)() = (void (*)()) argp;
-    server_proc();
-    printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
-    exit(1);
-    return (void *) 0;
+    void (*server_proc) (void *) = (void (*)(void *))argp;
+    server_proc(NULL);
+    dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
+    return (void *) -1; /* reused as return value, see pthread(3) */
 }
 
 /*
  * Start an Rx server process.
  */
-void rxi_StartServerProc(proc, stacksize)
-    void (*proc)();
-    int stacksize;
+void
+rxi_StartServerProc(void *(*proc) (void *), int stacksize)
 {
     pthread_t thread;
     pthread_attr_t tattr;
     AFS_SIGSET_DECL;
 
-    if (pthread_attr_init
-       (&tattr) != 0) {
-       printf("Unable to Create Rx server thread (pthread_attr_init)\n");
-       exit(1);
+    if (pthread_attr_init(&tattr) != 0) {
+       osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
     }
 
-    if (pthread_attr_setdetachstate
-       (&tattr,
-        PTHREAD_CREATE_DETACHED) != 0) {
-       printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
-       exit(1);
+    if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
+       osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
     }
 
     /*
      * NOTE: We are ignoring the stack size parameter, for now.
      */
     AFS_SIGSET_CLEAR();
-    if (pthread_create
-       (&thread,
-        &tattr,
-        server_entry,
-        (void *) proc) != 0) {
-       printf("Unable to Create Rx server thread\n");
-       exit(1);
+    if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
+       osi_Panic("Unable to Create Rx server thread\n");
     }
     AFS_SIGSET_RESTORE();
 }
@@ -139,28 +130,30 @@ void rxi_StartServerProc(proc, stacksize)
 /*
  * The event handling process.
  */
-static void *event_handler(void *argp)
+static void *
+event_handler(void *argp)
 {
-    struct clock rx_pthread_last_event_wait_time = {0,0};
     unsigned long rx_pthread_n_event_expired = 0;
     unsigned long rx_pthread_n_event_waits = 0;
     long rx_pthread_n_event_woken = 0;
-    struct timespec rx_pthread_next_event_time = {0,0};
+    unsigned long rx_pthread_n_event_error = 0;
+    struct timespec rx_pthread_next_event_time = { 0, 0 };
+    int error;
 
-    assert(pthread_mutex_lock(&event_handler_mutex)==0);
+    MUTEX_ENTER(&event_handler_mutex);
 
     for (;;) {
        struct clock cv;
        struct clock next;
 
-       assert(pthread_mutex_unlock(&event_handler_mutex)==0);
-    
-       next.sec = 30; /* Time to sleep if there are no events scheduled */
+       MUTEX_EXIT(&event_handler_mutex);
+
+       next.sec = 30;          /* Time to sleep if there are no events scheduled */
        next.usec = 0;
-       gettimeofday((struct timeval *)&cv, NULL);
+       clock_GetTime(&cv);
        rxevent_RaiseEvents(&next);
 
-       assert(pthread_mutex_lock(&event_handler_mutex)==0);
+       MUTEX_ENTER(&event_handler_mutex);
        if (rx_pthread_event_rescheduled) {
            rx_pthread_event_rescheduled = 0;
            continue;
@@ -170,62 +163,76 @@ static void *event_handler(void *argp)
        rx_pthread_next_event_time.tv_sec = cv.sec;
        rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
        rx_pthread_n_event_waits++;
-       if (pthread_cond_timedwait
-           (&rx_event_handler_cond,
-            &event_handler_mutex,
-            &rx_pthread_next_event_time) == -1) {
-#ifdef notdef
-           assert(errno == EAGAIN); 
-#endif
-           rx_pthread_n_event_expired++;
-       } else {        
+       error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
+        if (error == 0) {
            rx_pthread_n_event_woken++;
-       }
+        }
+#ifdef AFS_NT40_ENV
+        else if (error == ETIMEDOUT) {
+           rx_pthread_n_event_expired++;
+       } else {
+            rx_pthread_n_event_error++;
+        }
+#else
+        else if (errno == ETIMEDOUT) {
+            rx_pthread_n_event_expired++;
+        } else {
+            rx_pthread_n_event_error++;
+        }
+#endif
        rx_pthread_event_rescheduled = 0;
     }
+    AFS_UNREACHED(return(NULL));
 }
 
 
 /*
  * This routine will get called by the event package whenever a new,
  * earlier than others, event is posted. */
-void rxi_ReScheduleEvents() {
-    assert(pthread_mutex_lock(&event_handler_mutex)==0);
-    pthread_cond_signal(&rx_event_handler_cond);
+void
+rxi_ReScheduleEvents(void)
+{
+    MUTEX_ENTER(&event_handler_mutex);
+    CV_SIGNAL(&rx_event_handler_cond);
     rx_pthread_event_rescheduled = 1;
-    assert(pthread_mutex_unlock(&event_handler_mutex)==0);
+    MUTEX_EXIT(&event_handler_mutex);
 }
 
 
 /* Loop to listen on a socket. Return setting *newcallp if this
  * thread should become a server thread.  */
-static void rxi_ListenerProc(sock, tnop, newcallp)
-int sock;
-int *tnop;
-struct rx_call **newcallp;
+static void
+rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
 {
     unsigned int host;
     u_short port;
-    register struct rx_packet *p = (struct rx_packet *)0;
+    struct rx_packet *p = (struct rx_packet *)0;
 
-    assert(pthread_mutex_lock(&listener_mutex)==0);
+    if (!(rx_enable_hot_thread && newcallp)) {
+       /* Don't do this for hot threads, since we might stop being the
+        * listener. */
+       opr_threadname_set("rx_Listener");
+    }
+
+    MUTEX_ENTER(&listener_mutex);
     while (!listeners_started) {
-       assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0);
+       CV_WAIT(&rx_listener_cond, &listener_mutex);
     }
-    assert(pthread_mutex_unlock(&listener_mutex)==0);
+    MUTEX_EXIT(&listener_mutex);
 
     for (;;) {
+        /* See if a check for additional packets was issued */
+        rx_CheckPackets();
+
        /*
         * Grab a new packet only if necessary (otherwise re-use the old one)
         */
        if (p) {
            rxi_RestoreDataBufs(p);
-       }
-       else {
+       } else {
            if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
                /* Could this happen with multiple socket listeners? */
-               printf("rxi_Listener: no packets!"); /* Shouldn't happen */
-               exit(1);
+               osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
            }
        }
 
@@ -245,54 +252,72 @@ struct rx_call **newcallp;
 /* This is the listener process request loop. The listener process loop
  * becomes a server thread when rxi_ListenerProc returns, and stays
  * server thread until rxi_ServerProc returns. */
-static void *rx_ListenerProc(void *argp)
+static void *
+rx_ListenerProc(void *argp)
 {
     int threadID;
-    int sock = (int) argp;
+    osi_socket sock = (osi_socket)(intptr_t)argp;
     struct rx_call *newcall;
 
-    while(1) {
+    while (1) {
        newcall = NULL;
        threadID = -1;
        rxi_ListenerProc(sock, &threadID, &newcall);
-       /* assert(threadID != -1); */
-       /* assert(newcall != NULL); */
+       /* osi_Assert(threadID != -1); */
+       /* osi_Assert(newcall != NULL); */
        sock = OSI_NULLSOCKET;
-       assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
+       rxi_SetThreadNum(threadID);
        rxi_ServerProc(threadID, newcall, &sock);
-       /* assert(sock != OSI_NULLSOCKET); */
+       /* osi_Assert(sock != OSI_NULLSOCKET); */
     }
-    /* not reached */
+    AFS_UNREACHED(return(NULL));
 }
 
 /* This is the server process request loop. The server process loop
  * becomes a listener thread when rxi_ServerProc returns, and stays
  * listener thread until rxi_ListenerProc returns. */
-void rx_ServerProc()
+void *
+rx_ServerProc(void * dummy)
 {
-    int sock;
+    osi_socket sock;
     int threadID;
     struct rx_call *newcall = NULL;
 
-    rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
-    MUTEX_ENTER(&rx_stats_mutex);
+    rxi_MorePackets(rx_maxReceiveWindow + 2);  /* alloc more packets */
+    MUTEX_ENTER(&rx_quota_mutex);
     rxi_dataQuota += rx_initSendWindow;        /* Reserve some pkts for hard times */
     /* threadID is used for making decisions in GetCall.  Get it by bumping
      * number of threads handling incoming calls */
-    threadID = rxi_availProcs++;
-    MUTEX_EXIT(&rx_stats_mutex);
-
-    while(1) {
+    /* Unique thread ID: used for scheduling purposes *and* as index into
+     * the host hold table (fileserver).
+     * The previously used rxi_availProcs is unsuitable as it
+     * will already go up and down as packets arrive while the server
+     * threads are still initialising! The recently introduced
+     * rxi_pthread_hinum does not necessarily lead to a server
+     * thread with id 0, which is not allowed to hop through the
+     * incoming call queue.
+     * So either introduce yet another counter or flag the FCFS
+     * thread... chose the latter.
+     */
+    MUTEX_ENTER(&rx_pthread_mutex);
+    threadID = rx_NewThreadId();
+    if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
+       rxi_fcfs_thread_num = threadID;
+    MUTEX_EXIT(&rx_pthread_mutex);
+    ++rxi_availProcs;
+    MUTEX_EXIT(&rx_quota_mutex);
+
+    while (1) {
        sock = OSI_NULLSOCKET;
-       assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
+       rxi_SetThreadNum(threadID);
        rxi_ServerProc(threadID, newcall, &sock);
-       /* assert(sock != OSI_NULLSOCKET); */
+       /* osi_Assert(sock != OSI_NULLSOCKET); */
        newcall = NULL;
        rxi_ListenerProc(sock, &threadID, &newcall);
-       /* assert(threadID != -1); */
-       /* assert(newcall != NULL); */
+       /* osi_Assert(threadID != -1); */
+       /* osi_Assert(newcall != NULL); */
     }
-    /* not reached */
+    AFS_UNREACHED(return(NULL));
 }
 
 /*
@@ -300,77 +325,65 @@ void rx_ServerProc()
  * listener processes (one for each socket); these are started by GetUdpSocket.
  *
  * The event handling process *is* started here (the old listener used
- * to also handle events). The listener threads can't actually start 
+ * to also handle events). The listener threads can't actually start
  * listening until rxi_StartListener is called because most of R may not
  * be initialized when rxi_Listen is called.
  */
-void rxi_StartListener() {
+void
+rxi_StartListener(void)
+{
     pthread_attr_t tattr;
     AFS_SIGSET_DECL;
 
-    if (pthread_attr_init
-       (&tattr) != 0) {
-       printf("Unable to create Rx event handling thread (pthread_attr_init)\n");
-       exit(1);
+       if (listeners_started)
+               return;
+
+    if (pthread_attr_init(&tattr) != 0) {
+       osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
     }
 
-    if (pthread_attr_setdetachstate
-       (&tattr,
-        PTHREAD_CREATE_DETACHED) != 0) {
-       printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
-       exit(1);
+    if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
+       osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
     }
 
     AFS_SIGSET_CLEAR();
-    if (pthread_create
-       (&event_handler_thread,
-        &tattr,
-        event_handler,
-        NULL) != 0) {
-       printf("Unable to create Rx event handling thread\n");
-       exit(1);
+    if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
+       0) {
+       osi_Panic("Unable to create Rx event handling thread\n");
     }
+    rx_NewThreadId();
     AFS_SIGSET_RESTORE();
 
-    assert(pthread_mutex_lock(&listener_mutex)==0);
-    assert(pthread_cond_broadcast(&rx_listener_cond)==0);
+    MUTEX_ENTER(&listener_mutex);
+    CV_BROADCAST(&rx_listener_cond);
     listeners_started = 1;
-    assert(pthread_mutex_unlock(&listener_mutex)==0);
+    MUTEX_EXIT(&listener_mutex);
 
 }
 
 /*
  * Listen on the specified socket.
  */
-rxi_Listen(sock)
-    osi_socket sock;
+int
+rxi_Listen(osi_socket sock)
 {
     pthread_t thread;
     pthread_attr_t tattr;
     AFS_SIGSET_DECL;
 
-    if (pthread_attr_init
-       (&tattr) != 0) {
-       printf("Unable to create socket listener thread (pthread_attr_init)\n");
-       exit(1);
+    if (pthread_attr_init(&tattr) != 0) {
+       osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
     }
 
-    if (pthread_attr_setdetachstate
-       (&tattr,
-        PTHREAD_CREATE_DETACHED) != 0) {
-       printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
-       exit(1);
+    if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
+       osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
     }
 
     AFS_SIGSET_CLEAR();
-    if (pthread_create
-       (&thread,
-        &tattr,
-        rx_ListenerProc,
-        (void *) sock) != 0) {
-       printf("Unable to create socket listener thread\n");
-       exit(1);
+    if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
+       osi_Panic("Unable to create socket listener thread\n");
     }
+    rx_NewThreadId();
     AFS_SIGSET_RESTORE();
     return 0;
 }
@@ -380,36 +393,88 @@ rxi_Listen(sock)
  * Recvmsg.
  *
  */
-int rxi_Recvmsg
-    (int socket,
-     struct msghdr *msg_p,
-     int flags)
+int
+rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
     int ret;
     ret = recvmsg(socket, msg_p, flags);
+
+    if (ret < 0) {
+       rxi_HandleSocketErrors(socket);
+    }
+
     return ret;
 }
 
 /*
  * Sendmsg.
  */
-rxi_Sendmsg(socket, msg_p, flags)
-    osi_socket socket;
-    struct msghdr *msg_p;
-    int flags;
+int
+rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
-    int ret;
-    ret = sendmsg(socket, msg_p, flags);
-#ifdef AFS_LINUX22_ENV
+    int err;
+    if (sendmsg(socket, msg_p, flags) >= 0) {
+       return 0;
+    }
+
+#ifdef AFS_NT40_ENV
+    err = WSAGetLastError();
+#else
+    err = errno;
+#endif
+
+#ifndef AFS_RXERRQ_ENV
+# ifdef AFS_LINUX22_ENV
     /* linux unfortunately returns ECONNREFUSED if the target port
      * is no longer in use */
     /* and EAGAIN if a UDP checksum is incorrect */
-    if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
-#else
-    if (ret == -1) {
-#endif
-       printf("rxi_sendmsg failed, error %d\n", errno);
-       fflush(stdout);
+    if (err == ECONNREFUSED || err == EAGAIN) {
+       return 0;
     }
-    return 0;
+# endif
+    dpf(("rxi_sendmsg failed, error %d\n", errno));
+    fflush(stdout);
+#endif /* !AFS_RXERRQ_ENV */
+
+    if (err > 0) {
+       return -err;
+    }
+    return -1;
+}
+
+struct rx_ts_info_t * rx_ts_info_init(void) {
+    struct rx_ts_info_t * rx_ts_info;
+    rx_ts_info = calloc(1, sizeof(rx_ts_info_t));
+    osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
+#ifdef RX_ENABLE_TSFPQ
+    opr_queue_Init(&rx_ts_info->_FPQ.queue);
+
+    MUTEX_ENTER(&rx_packets_mutex);
+    rx_TSFPQMaxProcs++;
+    RX_TS_FPQ_COMPUTE_LIMITS;
+    MUTEX_EXIT(&rx_packets_mutex);
+#endif /* RX_ENABLE_TSFPQ */
+    return rx_ts_info;
+}
+
+int
+rx_GetThreadNum(void) {
+    return (intptr_t)pthread_getspecific(rx_thread_id_key);
 }
+
+static void
+rxi_SetThreadNum(int threadID) {
+    osi_Assert(pthread_setspecific(rx_thread_id_key,
+                                  (void *)(intptr_t)threadID) == 0);
+}
+
+int
+rx_SetThreadNum(void) {
+    int threadId;
+
+    threadId = rx_NewThreadId();
+    rxi_SetThreadNum(threadId);
+    return threadId;
+}
+
+#endif