/*
* Copyright 2000, International Business Machines Corporation and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
#include <afsconfig.h>
#include <afs/param.h>
-RCSID
- ("$Header$");
-
-#include <sys/types.h>
-#include <errno.h>
-#include <signal.h>
-#ifndef AFS_NT40_ENV
-# include <sys/socket.h>
-# include <sys/file.h>
-# include <netdb.h>
-# include <netinet/in.h>
-# include <net/if.h>
-# include <sys/ioctl.h>
-# include <sys/time.h>
-#endif
-#include <sys/stat.h>
-#include <rx/rx.h>
-#include <rx/rx_globals.h>
+#include <roken.h>
+#include <afs/opr.h>
+
#include <assert.h>
-#include <rx/rx_pthread.h>
-#include <rx/rx_clock.h>
-/*
- * Number of times the event handling thread was signalled because a new
- * event was scheduled earlier than the lastest event.
- *
- * Protected by event_handler_mutex
- */
-static long rx_pthread_n_event_wakeups;
+#ifdef AFS_PTHREAD_ENV
+
+#include "rx.h"
+#include "rx_globals.h"
+#include "rx_pthread.h"
+#include "rx_clock.h"
+#include "rx_atomic.h"
+#include "rx_internal.h"
+#include "rx_pthread.h"
+#ifdef AFS_NT40_ENV
+#include "rx_xmit_nt.h"
+#endif
+
+static void rxi_SetThreadNum(int threadID);
/* Set rx_pthread_event_rescheduled if event_handler should just try
* again instead of sleeping.
* This thread is also responsible for keeping time.
*/
static pthread_t event_handler_thread;
-pthread_cond_t rx_event_handler_cond;
-pthread_mutex_t event_handler_mutex;
-pthread_cond_t rx_listener_cond;
-pthread_mutex_t listener_mutex;
+afs_kcondvar_t rx_event_handler_cond;
+afs_kmutex_t event_handler_mutex;
+afs_kcondvar_t rx_listener_cond;
+afs_kmutex_t listener_mutex;
static int listeners_started = 0;
-pthread_mutex_t rx_clock_mutex;
+afs_kmutex_t rx_clock_mutex;
struct clock rxi_clockNow;
+static rx_atomic_t threadHiNum;
+
+int
+rx_NewThreadId(void) {
+ return rx_atomic_inc_and_read(&threadHiNum);
+}
+
/*
* Delay the current thread the specified number of seconds.
*/
void
rxi_InitializeThreadSupport(void)
{
- listeners_started = 0;
+ /* listeners_started must only be reset if
+ * the listener thread terminates */
+ /* listeners_started = 0; */
clock_GetTime(&rxi_clockNow);
}
static void *
server_entry(void *argp)
{
- void (*server_proc) () = (void (*)())argp;
- server_proc();
- printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
- exit(1);
- return (void *)0;
+ void (*server_proc) (void *) = (void (*)(void *))argp;
+ server_proc(NULL);
+ dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
+ return (void *) -1; /* reused as return value, see pthread(3) */
}
/*
* Start an Rx server process.
*/
void
-rxi_StartServerProc(void (*proc) (void), int stacksize)
+rxi_StartServerProc(void *(*proc) (void *), int stacksize)
{
pthread_t thread;
pthread_attr_t tattr;
AFS_SIGSET_DECL;
if (pthread_attr_init(&tattr) != 0) {
- printf("Unable to Create Rx server thread (pthread_attr_init)\n");
- exit(1);
+ osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
}
if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
- printf
- ("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
- exit(1);
+ osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
}
/*
*/
AFS_SIGSET_CLEAR();
if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
- printf("Unable to Create Rx server thread\n");
- exit(1);
+ osi_Panic("Unable to Create Rx server thread\n");
}
AFS_SIGSET_RESTORE();
}
static void *
event_handler(void *argp)
{
- struct clock rx_pthread_last_event_wait_time = { 0, 0 };
unsigned long rx_pthread_n_event_expired = 0;
unsigned long rx_pthread_n_event_waits = 0;
long rx_pthread_n_event_woken = 0;
+ unsigned long rx_pthread_n_event_error = 0;
struct timespec rx_pthread_next_event_time = { 0, 0 };
+ int error;
- assert(pthread_mutex_lock(&event_handler_mutex) == 0);
+ MUTEX_ENTER(&event_handler_mutex);
for (;;) {
struct clock cv;
struct clock next;
- assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
+ MUTEX_EXIT(&event_handler_mutex);
next.sec = 30; /* Time to sleep if there are no events scheduled */
next.usec = 0;
clock_GetTime(&cv);
rxevent_RaiseEvents(&next);
- assert(pthread_mutex_lock(&event_handler_mutex) == 0);
+ MUTEX_ENTER(&event_handler_mutex);
if (rx_pthread_event_rescheduled) {
rx_pthread_event_rescheduled = 0;
continue;
rx_pthread_next_event_time.tv_sec = cv.sec;
rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
rx_pthread_n_event_waits++;
- if (pthread_cond_timedwait
- (&rx_event_handler_cond, &event_handler_mutex,
- &rx_pthread_next_event_time) == -1) {
-#ifdef notdef
- assert(errno == EAGAIN);
-#endif
+ error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
+ if (error == 0) {
+ rx_pthread_n_event_woken++;
+ }
+#ifdef AFS_NT40_ENV
+ else if (error == ETIMEDOUT) {
rx_pthread_n_event_expired++;
} else {
- rx_pthread_n_event_woken++;
- }
+ rx_pthread_n_event_error++;
+ }
+#else
+ else if (errno == ETIMEDOUT) {
+ rx_pthread_n_event_expired++;
+ } else {
+ rx_pthread_n_event_error++;
+ }
+#endif
rx_pthread_event_rescheduled = 0;
}
+ AFS_UNREACHED(return(NULL));
}
void
rxi_ReScheduleEvents(void)
{
- assert(pthread_mutex_lock(&event_handler_mutex) == 0);
- pthread_cond_signal(&rx_event_handler_cond);
+ MUTEX_ENTER(&event_handler_mutex);
+ CV_SIGNAL(&rx_event_handler_cond);
rx_pthread_event_rescheduled = 1;
- assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
+ MUTEX_EXIT(&event_handler_mutex);
}
/* Loop to listen on a socket. Return setting *newcallp if this
* thread should become a server thread. */
static void
-rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
+rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
{
unsigned int host;
u_short port;
- register struct rx_packet *p = (struct rx_packet *)0;
+ struct rx_packet *p = (struct rx_packet *)0;
+
+ if (!(rx_enable_hot_thread && newcallp)) {
+ /* Don't do this for hot threads, since we might stop being the
+ * listener. */
+ opr_threadname_set("rx_Listener");
+ }
- assert(pthread_mutex_lock(&listener_mutex) == 0);
+ MUTEX_ENTER(&listener_mutex);
while (!listeners_started) {
- assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
+ CV_WAIT(&rx_listener_cond, &listener_mutex);
}
- assert(pthread_mutex_unlock(&listener_mutex) == 0);
+ MUTEX_EXIT(&listener_mutex);
for (;;) {
+ /* See if a check for additional packets was issued */
+ rx_CheckPackets();
+
/*
* Grab a new packet only if necessary (otherwise re-use the old one)
*/
} else {
if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
/* Could this happen with multiple socket listeners? */
- printf("rxi_Listener: no packets!"); /* Shouldn't happen */
- exit(1);
+ osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
}
}
rx_ListenerProc(void *argp)
{
int threadID;
- int sock = (int)argp;
+ osi_socket sock = (osi_socket)(intptr_t)argp;
struct rx_call *newcall;
while (1) {
newcall = NULL;
threadID = -1;
rxi_ListenerProc(sock, &threadID, &newcall);
- /* assert(threadID != -1); */
- /* assert(newcall != NULL); */
+ /* osi_Assert(threadID != -1); */
+ /* osi_Assert(newcall != NULL); */
sock = OSI_NULLSOCKET;
- assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
+ rxi_SetThreadNum(threadID);
rxi_ServerProc(threadID, newcall, &sock);
- /* assert(sock != OSI_NULLSOCKET); */
+ /* osi_Assert(sock != OSI_NULLSOCKET); */
}
- /* not reached */
+ AFS_UNREACHED(return(NULL));
}
/* This is the server process request loop. The server process loop
* becomes a listener thread when rxi_ServerProc returns, and stays
* listener thread until rxi_ListenerProc returns. */
-void
-rx_ServerProc(void)
+void *
+rx_ServerProc(void * dummy)
{
- int sock;
+ osi_socket sock;
int threadID;
struct rx_call *newcall = NULL;
rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
/* threadID is used for making decisions in GetCall. Get it by bumping
* number of threads handling incoming calls */
/* Unique thread ID: used for scheduling purposes *and* as index into
- * the host hold table (fileserver).
+ * the host hold table (fileserver).
* The previously used rxi_availProcs is unsuitable as it
* will already go up and down as packets arrive while the server
* threads are still initialising! The recently introduced
* So either introduce yet another counter or flag the FCFS
* thread... chose the latter.
*/
- threadID = ++rxi_pthread_hinum;
+ MUTEX_ENTER(&rx_pthread_mutex);
+ threadID = rx_NewThreadId();
if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
rxi_fcfs_thread_num = threadID;
+ MUTEX_EXIT(&rx_pthread_mutex);
++rxi_availProcs;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
while (1) {
sock = OSI_NULLSOCKET;
- assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
+ rxi_SetThreadNum(threadID);
rxi_ServerProc(threadID, newcall, &sock);
- /* assert(sock != OSI_NULLSOCKET); */
+ /* osi_Assert(sock != OSI_NULLSOCKET); */
newcall = NULL;
rxi_ListenerProc(sock, &threadID, &newcall);
- /* assert(threadID != -1); */
- /* assert(newcall != NULL); */
+ /* osi_Assert(threadID != -1); */
+ /* osi_Assert(newcall != NULL); */
}
- /* not reached */
+ AFS_UNREACHED(return(NULL));
}
/*
* listener processes (one for each socket); these are started by GetUdpSocket.
*
* The event handling process *is* started here (the old listener used
- * to also handle events). The listener threads can't actually start
+ * to also handle events). The listener threads can't actually start
* listening until rxi_StartListener is called because most of R may not
* be initialized when rxi_Listen is called.
*/
pthread_attr_t tattr;
AFS_SIGSET_DECL;
+ if (listeners_started)
+ return;
+
if (pthread_attr_init(&tattr) != 0) {
- printf
- ("Unable to create Rx event handling thread (pthread_attr_init)\n");
- exit(1);
+ osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
}
if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
- printf
- ("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
- exit(1);
+ osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
}
AFS_SIGSET_CLEAR();
if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
0) {
- printf("Unable to create Rx event handling thread\n");
- exit(1);
+ osi_Panic("Unable to create Rx event handling thread\n");
}
- MUTEX_ENTER(&rx_stats_mutex);
- ++rxi_pthread_hinum;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_NewThreadId();
AFS_SIGSET_RESTORE();
- assert(pthread_mutex_lock(&listener_mutex) == 0);
- assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
+ MUTEX_ENTER(&listener_mutex);
+ CV_BROADCAST(&rx_listener_cond);
listeners_started = 1;
- assert(pthread_mutex_unlock(&listener_mutex) == 0);
+ MUTEX_EXIT(&listener_mutex);
}
AFS_SIGSET_DECL;
if (pthread_attr_init(&tattr) != 0) {
- printf
- ("Unable to create socket listener thread (pthread_attr_init)\n");
- exit(1);
+ osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
}
if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
- printf
- ("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
- exit(1);
+ osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
}
AFS_SIGSET_CLEAR();
- if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
- printf("Unable to create socket listener thread\n");
- exit(1);
+ if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
+ osi_Panic("Unable to create socket listener thread\n");
}
- MUTEX_ENTER(&rx_stats_mutex);
- ++rxi_pthread_hinum;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_NewThreadId();
AFS_SIGSET_RESTORE();
return 0;
}
*
*/
int
-rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
+rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
{
int ret;
ret = recvmsg(socket, msg_p, flags);
+
+#ifdef AFS_RXERRQ_ENV
+ if (ret < 0) {
+ while (rxi_HandleSocketError(socket) > 0)
+ ;
+ }
+#endif
+
return ret;
}
{
int ret;
ret = sendmsg(socket, msg_p, flags);
-#ifdef AFS_LINUX22_ENV
+
+#ifdef AFS_RXERRQ_ENV
+ if (ret < 0) {
+ while (rxi_HandleSocketError(socket) > 0)
+ ;
+ return ret;
+ }
+#else
+# ifdef AFS_LINUX22_ENV
/* linux unfortunately returns ECONNREFUSED if the target port
* is no longer in use */
/* and EAGAIN if a UDP checksum is incorrect */
if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
-#else
+# else
if (ret == -1) {
-#endif
- printf("rxi_sendmsg failed, error %d\n", errno);
+# endif
+ dpf(("rxi_sendmsg failed, error %d\n", errno));
fflush(stdout);
+# ifndef AFS_NT40_ENV
+ if (errno > 0)
+ return -errno;
+# else
+ if (WSAGetLastError() > 0)
+ return -WSAGetLastError();
+# endif
+ return -1;
}
+#endif /* !AFS_RXERRQ_ENV */
return 0;
}
-struct rx_ts_info_t * rx_ts_info_init() {
- register struct rx_ts_info_t * rx_ts_info;
- rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
- assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
- memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
+struct rx_ts_info_t * rx_ts_info_init(void) {
+ struct rx_ts_info_t * rx_ts_info;
+ rx_ts_info = calloc(1, sizeof(rx_ts_info_t));
+ osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
#ifdef RX_ENABLE_TSFPQ
- queue_Init(&rx_ts_info->_FPQ);
+ opr_queue_Init(&rx_ts_info->_FPQ.queue);
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_packets_mutex);
rx_TSFPQMaxProcs++;
RX_TS_FPQ_COMPUTE_LIMITS;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_packets_mutex);
#endif /* RX_ENABLE_TSFPQ */
return rx_ts_info;
}
+
+int
+rx_GetThreadNum(void) {
+ return (intptr_t)pthread_getspecific(rx_thread_id_key);
+}
+
+static void
+rxi_SetThreadNum(int threadID) {
+ osi_Assert(pthread_setspecific(rx_thread_id_key,
+ (void *)(intptr_t)threadID) == 0);
+}
+
+int
+rx_SetThreadNum(void) {
+ int threadId;
+
+ threadId = rx_NewThreadId();
+ rxi_SetThreadNum(threadId);
+ return threadId;
+}
+
+#endif