Tidy up the rx directory
[openafs.git] / src / rx / rx_pthread.c
index 0a14c0f..d0cd331 100644 (file)
@@ -18,8 +18,6 @@
 #include <afsconfig.h>
 #include <afs/param.h>
 
-RCSID
-    ("$Header$");
 
 #include <sys/types.h>
 #include <errno.h>
@@ -65,12 +63,12 @@ static void *rx_ListenerProc(void *);
  * This thread is also responsible for keeping time.
  */
 static pthread_t event_handler_thread;
-pthread_cond_t rx_event_handler_cond;
-pthread_mutex_t event_handler_mutex;
-pthread_cond_t rx_listener_cond;
-pthread_mutex_t listener_mutex;
+afs_kcondvar_t rx_event_handler_cond;
+afs_kmutex_t event_handler_mutex;
+afs_kcondvar_t rx_listener_cond;
+afs_kmutex_t listener_mutex;
 static int listeners_started = 0;
-pthread_mutex_t rx_clock_mutex;
+afs_kmutex_t rx_clock_mutex;
 struct clock rxi_clockNow;
 
 /*
@@ -97,18 +95,18 @@ rxi_InitializeThreadSupport(void)
 static void *
 server_entry(void *argp)
 {
-    void (*server_proc) () = (void (*)())argp;
-    server_proc();
+    void (*server_proc) (void *) = (void (*)(void *))argp;
+    server_proc(NULL);
     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
     exit(1);
-    return (void *)0;
+    return NULL;
 }
 
 /*
  * Start an Rx server process.
  */
 void
-rxi_StartServerProc(void (*proc) (void), int stacksize)
+rxi_StartServerProc(void *(*proc) (void *), int stacksize)
 {
     pthread_t thread;
     pthread_attr_t tattr;
@@ -150,20 +148,20 @@ event_handler(void *argp)
     struct timespec rx_pthread_next_event_time = { 0, 0 };
     int error;
 
-    assert(pthread_mutex_lock(&event_handler_mutex) == 0);
+    MUTEX_ENTER(&event_handler_mutex);
 
     for (;;) {
        struct clock cv;
        struct clock next;
 
-       assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
+       MUTEX_EXIT(&event_handler_mutex);
 
        next.sec = 30;          /* Time to sleep if there are no events scheduled */
        next.usec = 0;
        clock_GetTime(&cv);
        rxevent_RaiseEvents(&next);
 
-       assert(pthread_mutex_lock(&event_handler_mutex) == 0);
+       MUTEX_ENTER(&event_handler_mutex);
        if (rx_pthread_event_rescheduled) {
            rx_pthread_event_rescheduled = 0;
            continue;
@@ -173,9 +171,7 @@ event_handler(void *argp)
        rx_pthread_next_event_time.tv_sec = cv.sec;
        rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
        rx_pthread_n_event_waits++;
-       error = pthread_cond_timedwait
-           (&rx_event_handler_cond, &event_handler_mutex,
-            &rx_pthread_next_event_time);
+       error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
         if (error == 0) {
            rx_pthread_n_event_woken++;
         } 
@@ -194,6 +190,7 @@ event_handler(void *argp)
 #endif
        rx_pthread_event_rescheduled = 0;
     }
+    return NULL;
 }
 
 
@@ -203,27 +200,27 @@ event_handler(void *argp)
 void
 rxi_ReScheduleEvents(void)
 {
-    assert(pthread_mutex_lock(&event_handler_mutex) == 0);
-    pthread_cond_signal(&rx_event_handler_cond);
+    MUTEX_ENTER(&event_handler_mutex);
+    CV_SIGNAL(&rx_event_handler_cond);
     rx_pthread_event_rescheduled = 1;
-    assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
+    MUTEX_EXIT(&event_handler_mutex);
 }
 
 
 /* Loop to listen on a socket. Return setting *newcallp if this
  * thread should become a server thread.  */
 static void
-rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
+rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
 {
     unsigned int host;
     u_short port;
-    register struct rx_packet *p = (struct rx_packet *)0;
+    struct rx_packet *p = (struct rx_packet *)0;
 
-    assert(pthread_mutex_lock(&listener_mutex) == 0);
+    MUTEX_ENTER(&listener_mutex);
     while (!listeners_started) {
-       assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
+       CV_WAIT(&rx_listener_cond, &listener_mutex);
     }
-    assert(pthread_mutex_unlock(&listener_mutex) == 0);
+    MUTEX_EXIT(&listener_mutex);
 
     for (;;) {
        /*
@@ -259,7 +256,7 @@ static void *
 rx_ListenerProc(void *argp)
 {
     int threadID;
-    int sock = (int)argp;
+    osi_socket sock = (osi_socket)argp;
     struct rx_call *newcall;
 
     while (1) {
@@ -274,20 +271,21 @@ rx_ListenerProc(void *argp)
        /* assert(sock != OSI_NULLSOCKET); */
     }
     /* not reached */
+    return NULL;
 }
 
 /* This is the server process request loop. The server process loop
  * becomes a listener thread when rxi_ServerProc returns, and stays
  * listener thread until rxi_ListenerProc returns. */
-void
-rx_ServerProc(void)
+void *
+rx_ServerProc(void * dummy)
 {
-    int sock;
+    osi_socket sock;
     int threadID;
     struct rx_call *newcall = NULL;
 
     rxi_MorePackets(rx_maxReceiveWindow + 2);  /* alloc more packets */
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_quota_mutex);
     rxi_dataQuota += rx_initSendWindow;        /* Reserve some pkts for hard times */
     /* threadID is used for making decisions in GetCall.  Get it by bumping
      * number of threads handling incoming calls */
@@ -302,11 +300,13 @@ rx_ServerProc(void)
      * So either introduce yet another counter or flag the FCFS
      * thread... chose the latter.
      */
+    MUTEX_ENTER(&rx_pthread_mutex);
     threadID = ++rxi_pthread_hinum;
     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
        rxi_fcfs_thread_num = threadID;
+    MUTEX_EXIT(&rx_pthread_mutex);
     ++rxi_availProcs;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_quota_mutex);
 
     while (1) {
        sock = OSI_NULLSOCKET;
@@ -319,6 +319,7 @@ rx_ServerProc(void)
        /* assert(newcall != NULL); */
     }
     /* not reached */
+    return NULL;
 }
 
 /*
@@ -357,15 +358,15 @@ rxi_StartListener(void)
        dpf(("Unable to create Rx event handling thread\n"));
        exit(1);
     }
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_pthread_mutex);
     ++rxi_pthread_hinum;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_pthread_mutex);
     AFS_SIGSET_RESTORE();
 
-    assert(pthread_mutex_lock(&listener_mutex) == 0);
-    assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
+    MUTEX_ENTER(&listener_mutex);
+    CV_BROADCAST(&rx_listener_cond);
     listeners_started = 1;
-    assert(pthread_mutex_unlock(&listener_mutex) == 0);
+    MUTEX_EXIT(&listener_mutex);
 
 }
 
@@ -396,9 +397,9 @@ rxi_Listen(osi_socket sock)
        dpf(("Unable to create socket listener thread\n"));
        exit(1);
     }
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_pthread_mutex);
     ++rxi_pthread_hinum;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_pthread_mutex);
     AFS_SIGSET_RESTORE();
     return 0;
 }
@@ -412,6 +413,10 @@ int
 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
 {
     int ret;
+#if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
+    while((rxi_HandleSocketError(socket)) > 0)
+      ;
+#endif
     ret = recvmsg(socket, msg_p, flags);
     return ret;
 }
@@ -439,18 +444,18 @@ rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
     return 0;
 }
 
-struct rx_ts_info_t * rx_ts_info_init() {
-    register struct rx_ts_info_t * rx_ts_info;
+struct rx_ts_info_t * rx_ts_info_init(void) {
+    struct rx_ts_info_t * rx_ts_info;
     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
 #ifdef RX_ENABLE_TSFPQ
     queue_Init(&rx_ts_info->_FPQ);
 
-    MUTEX_ENTER(&rx_stats_mutex);
+    MUTEX_ENTER(&rx_packets_mutex);
     rx_TSFPQMaxProcs++;
     RX_TS_FPQ_COMPUTE_LIMITS;
-    MUTEX_EXIT(&rx_stats_mutex);
+    MUTEX_EXIT(&rx_packets_mutex);
 #endif /* RX_ENABLE_TSFPQ */
     return rx_ts_info;
 }