#include <afsconfig.h>
#include <afs/param.h>
-RCSID
- ("$Header$");
#include <sys/types.h>
#include <errno.h>
* This thread is also responsible for keeping time.
*/
static pthread_t event_handler_thread;
-pthread_cond_t rx_event_handler_cond;
-pthread_mutex_t event_handler_mutex;
-pthread_cond_t rx_listener_cond;
-pthread_mutex_t listener_mutex;
+afs_kcondvar_t rx_event_handler_cond;
+afs_kmutex_t event_handler_mutex;
+afs_kcondvar_t rx_listener_cond;
+afs_kmutex_t listener_mutex;
static int listeners_started = 0;
-pthread_mutex_t rx_clock_mutex;
+afs_kmutex_t rx_clock_mutex;
struct clock rxi_clockNow;
/*
static void *
server_entry(void *argp)
{
- void (*server_proc) () = (void (*)())argp;
- server_proc();
+ void (*server_proc) (void *) = (void (*)(void *))argp;
+ server_proc(NULL);
dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
exit(1);
- return (void *)0;
+ return NULL;
}
/*
* Start an Rx server process.
*/
void
-rxi_StartServerProc(void (*proc) (void), int stacksize)
+rxi_StartServerProc(void *(*proc) (void *), int stacksize)
{
pthread_t thread;
pthread_attr_t tattr;
struct timespec rx_pthread_next_event_time = { 0, 0 };
int error;
- assert(pthread_mutex_lock(&event_handler_mutex) == 0);
+ MUTEX_ENTER(&event_handler_mutex);
for (;;) {
struct clock cv;
struct clock next;
- assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
+ MUTEX_EXIT(&event_handler_mutex);
next.sec = 30; /* Time to sleep if there are no events scheduled */
next.usec = 0;
clock_GetTime(&cv);
rxevent_RaiseEvents(&next);
- assert(pthread_mutex_lock(&event_handler_mutex) == 0);
+ MUTEX_ENTER(&event_handler_mutex);
if (rx_pthread_event_rescheduled) {
rx_pthread_event_rescheduled = 0;
continue;
rx_pthread_next_event_time.tv_sec = cv.sec;
rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
rx_pthread_n_event_waits++;
- error = pthread_cond_timedwait
- (&rx_event_handler_cond, &event_handler_mutex,
- &rx_pthread_next_event_time);
+ error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
if (error == 0) {
rx_pthread_n_event_woken++;
}
#endif
rx_pthread_event_rescheduled = 0;
}
+ return NULL;
}
void
rxi_ReScheduleEvents(void)
{
- assert(pthread_mutex_lock(&event_handler_mutex) == 0);
- pthread_cond_signal(&rx_event_handler_cond);
+ MUTEX_ENTER(&event_handler_mutex);
+ CV_SIGNAL(&rx_event_handler_cond);
rx_pthread_event_rescheduled = 1;
- assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
+ MUTEX_EXIT(&event_handler_mutex);
}
{
unsigned int host;
u_short port;
- register struct rx_packet *p = (struct rx_packet *)0;
+ struct rx_packet *p = (struct rx_packet *)0;
- assert(pthread_mutex_lock(&listener_mutex) == 0);
+ MUTEX_ENTER(&listener_mutex);
while (!listeners_started) {
- assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
+ CV_WAIT(&rx_listener_cond, &listener_mutex);
}
- assert(pthread_mutex_unlock(&listener_mutex) == 0);
+ MUTEX_EXIT(&listener_mutex);
for (;;) {
/*
/* assert(sock != OSI_NULLSOCKET); */
}
/* not reached */
+ return NULL;
}
/* This is the server process request loop. The server process loop
* becomes a listener thread when rxi_ServerProc returns, and stays
* listener thread until rxi_ListenerProc returns. */
-void
-rx_ServerProc(void)
+void *
+rx_ServerProc(void * dummy)
{
osi_socket sock;
int threadID;
struct rx_call *newcall = NULL;
rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
/* threadID is used for making decisions in GetCall. Get it by bumping
* number of threads handling incoming calls */
* So either introduce yet another counter or flag the FCFS
* thread... chose the latter.
*/
+ MUTEX_ENTER(&rx_pthread_mutex);
threadID = ++rxi_pthread_hinum;
if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
rxi_fcfs_thread_num = threadID;
+ MUTEX_EXIT(&rx_pthread_mutex);
++rxi_availProcs;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
while (1) {
sock = OSI_NULLSOCKET;
/* assert(newcall != NULL); */
}
/* not reached */
+ return NULL;
}
/*
dpf(("Unable to create Rx event handling thread\n"));
exit(1);
}
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_pthread_mutex);
++rxi_pthread_hinum;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_pthread_mutex);
AFS_SIGSET_RESTORE();
- assert(pthread_mutex_lock(&listener_mutex) == 0);
- assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
+ MUTEX_ENTER(&listener_mutex);
+ CV_BROADCAST(&rx_listener_cond);
listeners_started = 1;
- assert(pthread_mutex_unlock(&listener_mutex) == 0);
+ MUTEX_EXIT(&listener_mutex);
}
dpf(("Unable to create socket listener thread\n"));
exit(1);
}
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_pthread_mutex);
++rxi_pthread_hinum;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_pthread_mutex);
AFS_SIGSET_RESTORE();
return 0;
}
rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
{
int ret;
+#if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
+ while((rxi_HandleSocketError(socket)) > 0)
+ ;
+#endif
ret = recvmsg(socket, msg_p, flags);
return ret;
}
return 0;
}
-struct rx_ts_info_t * rx_ts_info_init() {
- register struct rx_ts_info_t * rx_ts_info;
+struct rx_ts_info_t * rx_ts_info_init(void) {
+ struct rx_ts_info_t * rx_ts_info;
rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
#ifdef RX_ENABLE_TSFPQ
queue_Init(&rx_ts_info->_FPQ);
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_packets_mutex);
rx_TSFPQMaxProcs++;
RX_TS_FPQ_COMPUTE_LIMITS;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_packets_mutex);
#endif /* RX_ENABLE_TSFPQ */
return rx_ts_info;
}