2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
23 #include <sys/types.h>
27 # include <sys/socket.h>
28 # include <sys/file.h>
30 # include <netinet/in.h>
32 # include <sys/ioctl.h>
33 # include <sys/time.h>
37 #include <rx_globals.h>
39 #include <rx/rx_pthread.h>
42 * Number of times the event handling thread was signalled because a new
43 * event was scheduled earlier than the lastest event.
45 * Protected by event_handler_mutex
47 static long rx_pthread_n_event_wakeups;
49 /* Set rx_pthread_event_rescheduled if event_handler should just try
50 * again instead of sleeping.
52 * Protected by event_handler_mutex
54 static int rx_pthread_event_rescheduled = 0;
56 static void *rx_ListenerProc(void *);
59 * We supply an event handling thread for Rx's event processing.
60 * The condition variable is used to wakeup the thread whenever a new
61 * event is scheduled earlier than the previous earliest event.
62 * This thread is also responsible for keeping time.
64 static pthread_t event_handler_thread;
65 pthread_cond_t rx_event_handler_cond;
66 pthread_mutex_t event_handler_mutex;
67 pthread_cond_t rx_listener_cond;
68 pthread_mutex_t listener_mutex;
69 static int listeners_started = 0;
70 pthread_mutex_t rx_clock_mutex;
71 struct clock rxi_clockNow;
74 * Delay the current thread the specified number of seconds.
76 void rxi_Delay(int sec)
82 * Called from rx_Init()
84 void rxi_InitializeThreadSupport(void)
86 listeners_started = 0;
87 gettimeofday((struct timeval *)&rxi_clockNow, NULL);
90 static void *server_entry(void * argp)
92 void (*server_proc)() = (void (*)()) argp;
94 printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
100 * Start an Rx server process.
102 void rxi_StartServerProc(void (*proc)(void), int stacksize)
105 pthread_attr_t tattr;
108 if (pthread_attr_init
110 printf("Unable to Create Rx server thread (pthread_attr_init)\n");
114 if (pthread_attr_setdetachstate
116 PTHREAD_CREATE_DETACHED) != 0) {
117 printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
122 * NOTE: We are ignoring the stack size parameter, for now.
129 (void *) proc) != 0) {
130 printf("Unable to Create Rx server thread\n");
133 AFS_SIGSET_RESTORE();
137 * The event handling process.
139 static void *event_handler(void *argp)
141 struct clock rx_pthread_last_event_wait_time = {0,0};
142 unsigned long rx_pthread_n_event_expired = 0;
143 unsigned long rx_pthread_n_event_waits = 0;
144 long rx_pthread_n_event_woken = 0;
145 struct timespec rx_pthread_next_event_time = {0,0};
147 assert(pthread_mutex_lock(&event_handler_mutex)==0);
153 assert(pthread_mutex_unlock(&event_handler_mutex)==0);
155 next.sec = 30; /* Time to sleep if there are no events scheduled */
157 gettimeofday((struct timeval *)&cv, NULL);
158 rxevent_RaiseEvents(&next);
160 assert(pthread_mutex_lock(&event_handler_mutex)==0);
161 if (rx_pthread_event_rescheduled) {
162 rx_pthread_event_rescheduled = 0;
166 clock_Add(&cv, &next);
167 rx_pthread_next_event_time.tv_sec = cv.sec;
168 rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
169 rx_pthread_n_event_waits++;
170 if (pthread_cond_timedwait
171 (&rx_event_handler_cond,
172 &event_handler_mutex,
173 &rx_pthread_next_event_time) == -1) {
175 assert(errno == EAGAIN);
177 rx_pthread_n_event_expired++;
179 rx_pthread_n_event_woken++;
181 rx_pthread_event_rescheduled = 0;
187 * This routine will get called by the event package whenever a new,
188 * earlier than others, event is posted. */
189 void rxi_ReScheduleEvents(void)
191 assert(pthread_mutex_lock(&event_handler_mutex)==0);
192 pthread_cond_signal(&rx_event_handler_cond);
193 rx_pthread_event_rescheduled = 1;
194 assert(pthread_mutex_unlock(&event_handler_mutex)==0);
198 /* Loop to listen on a socket. Return setting *newcallp if this
199 * thread should become a server thread. */
200 static void rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
204 register struct rx_packet *p = (struct rx_packet *)0;
206 assert(pthread_mutex_lock(&listener_mutex)==0);
207 while (!listeners_started) {
208 assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0);
210 assert(pthread_mutex_unlock(&listener_mutex)==0);
214 * Grab a new packet only if necessary (otherwise re-use the old one)
217 rxi_RestoreDataBufs(p);
220 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
221 /* Could this happen with multiple socket listeners? */
222 printf("rxi_Listener: no packets!"); /* Shouldn't happen */
227 if (rxi_ReadPacket(sock, p, &host, &port)) {
229 p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
230 if (newcallp && *newcallp) {
240 /* This is the listener process request loop. The listener process loop
241 * becomes a server thread when rxi_ListenerProc returns, and stays
242 * server thread until rxi_ServerProc returns. */
243 static void *rx_ListenerProc(void *argp)
246 int sock = (int) argp;
247 struct rx_call *newcall;
252 rxi_ListenerProc(sock, &threadID, &newcall);
253 /* assert(threadID != -1); */
254 /* assert(newcall != NULL); */
255 sock = OSI_NULLSOCKET;
256 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
257 rxi_ServerProc(threadID, newcall, &sock);
258 /* assert(sock != OSI_NULLSOCKET); */
263 /* This is the server process request loop. The server process loop
264 * becomes a listener thread when rxi_ServerProc returns, and stays
265 * listener thread until rxi_ListenerProc returns. */
266 void rx_ServerProc(void)
270 struct rx_call *newcall = NULL;
272 rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
273 MUTEX_ENTER(&rx_stats_mutex);
274 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
275 /* threadID is used for making decisions in GetCall. Get it by bumping
276 * number of threads handling incoming calls */
277 /* Unique thread ID: used for scheduling purposes *and* as index into
278 the host hold table (fileserver).
279 The previously used rxi_availProcs is unsuitable as it
280 will already go up and down as packets arrive while the server
281 threads are still initialising! The recently introduced
282 rxi_pthread_hinum does not necessarily lead to a server
283 thread with id 0, which is not allowed to hop through the
285 So either introduce yet another counter or flag the FCFS
286 thread... chose the latter.
288 threadID = ++rxi_pthread_hinum;
289 if (rxi_fcfs_thread_num==0 && rxi_fcfs_thread_num!=threadID)
290 rxi_fcfs_thread_num=threadID;
292 MUTEX_EXIT(&rx_stats_mutex);
295 sock = OSI_NULLSOCKET;
296 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
297 rxi_ServerProc(threadID, newcall, &sock);
298 /* assert(sock != OSI_NULLSOCKET); */
300 rxi_ListenerProc(sock, &threadID, &newcall);
301 /* assert(threadID != -1); */
302 /* assert(newcall != NULL); */
308 * Historically used to start the listener process. We now have multiple
309 * listener processes (one for each socket); these are started by GetUdpSocket.
311 * The event handling process *is* started here (the old listener used
312 * to also handle events). The listener threads can't actually start
313 * listening until rxi_StartListener is called because most of R may not
314 * be initialized when rxi_Listen is called.
316 void rxi_StartListener(void)
318 pthread_attr_t tattr;
321 if (pthread_attr_init
323 printf("Unable to create Rx event handling thread (pthread_attr_init)\n");
327 if (pthread_attr_setdetachstate
329 PTHREAD_CREATE_DETACHED) != 0) {
330 printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
336 (&event_handler_thread,
340 printf("Unable to create Rx event handling thread\n");
343 MUTEX_ENTER(&rx_stats_mutex);
345 MUTEX_EXIT(&rx_stats_mutex);
346 AFS_SIGSET_RESTORE();
348 assert(pthread_mutex_lock(&listener_mutex)==0);
349 assert(pthread_cond_broadcast(&rx_listener_cond)==0);
350 listeners_started = 1;
351 assert(pthread_mutex_unlock(&listener_mutex)==0);
356 * Listen on the specified socket.
358 int rxi_Listen(osi_socket sock)
361 pthread_attr_t tattr;
364 if (pthread_attr_init
366 printf("Unable to create socket listener thread (pthread_attr_init)\n");
370 if (pthread_attr_setdetachstate
372 PTHREAD_CREATE_DETACHED) != 0) {
373 printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
382 (void *) sock) != 0) {
383 printf("Unable to create socket listener thread\n");
386 MUTEX_ENTER(&rx_stats_mutex);
388 MUTEX_EXIT(&rx_stats_mutex);
389 AFS_SIGSET_RESTORE();
398 int rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
401 ret = recvmsg(socket, msg_p, flags);
408 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
411 ret = sendmsg(socket, msg_p, flags);
412 #ifdef AFS_LINUX22_ENV
413 /* linux unfortunately returns ECONNREFUSED if the target port
414 * is no longer in use */
415 /* and EAGAIN if a UDP checksum is incorrect */
416 if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
420 printf("rxi_sendmsg failed, error %d\n", errno);