2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
23 #include <sys/types.h>
27 # include <sys/socket.h>
28 # include <sys/file.h>
30 # include <netinet/in.h>
32 # include <sys/ioctl.h>
33 # include <sys/time.h>
37 #include <rx_globals.h>
39 #include <rx/rx_pthread.h>
42 * Number of times the event handling thread was signalled because a new
43 * event was scheduled earlier than the lastest event.
45 * Protected by event_handler_mutex
47 static long rx_pthread_n_event_wakeups;
49 /* Set rx_pthread_event_rescheduled if event_handler should just try
50 * again instead of sleeping.
52 * Protected by event_handler_mutex
54 static int rx_pthread_event_rescheduled = 0;
56 static void rx_ListenerProc(void *);
59 * We supply an event handling thread for Rx's event processing.
60 * The condition variable is used to wakeup the thread whenever a new
61 * event is scheduled earlier than the previous earliest event.
62 * This thread is also responsible for keeping time.
64 static pthread_t event_handler_thread;
65 pthread_cond_t rx_event_handler_cond;
66 pthread_mutex_t event_handler_mutex;
67 pthread_cond_t rx_listener_cond;
68 pthread_mutex_t listener_mutex;
69 static int listeners_started = 0;
70 pthread_mutex_t rx_clock_mutex;
71 struct clock rxi_clockNow;
74 * Delay the current thread the specified number of seconds.
83 * Called from rx_Init()
85 void rxi_InitializeThreadSupport() {
87 listeners_started = 0;
88 gettimeofday((struct timeval *)&rxi_clockNow, NULL);
91 static void *server_entry(void * argp)
93 void (*server_proc)() = (void (*)()) argp;
95 printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
101 * Start an Rx server process.
103 void rxi_StartServerProc(proc, stacksize)
108 pthread_attr_t tattr;
111 if (pthread_attr_init
113 printf("Unable to Create Rx server thread (pthread_attr_init)\n");
117 if (pthread_attr_setdetachstate
119 PTHREAD_CREATE_DETACHED) != 0) {
120 printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
125 * NOTE: We are ignoring the stack size parameter, for now.
132 (void *) proc) != 0) {
133 printf("Unable to Create Rx server thread\n");
136 AFS_SIGSET_RESTORE();
140 * The event handling process.
142 static void *event_handler(void *argp)
144 struct clock rx_pthread_last_event_wait_time = {0,0};
145 unsigned long rx_pthread_n_event_expired = 0;
146 unsigned long rx_pthread_n_event_waits = 0;
147 long rx_pthread_n_event_woken = 0;
148 struct timespec rx_pthread_next_event_time = {0,0};
150 assert(pthread_mutex_lock(&event_handler_mutex)==0);
156 assert(pthread_mutex_unlock(&event_handler_mutex)==0);
158 next.sec = 30; /* Time to sleep if there are no events scheduled */
160 gettimeofday((struct timeval *)&cv, NULL);
161 rxevent_RaiseEvents(&next);
163 assert(pthread_mutex_lock(&event_handler_mutex)==0);
164 if (rx_pthread_event_rescheduled) {
165 rx_pthread_event_rescheduled = 0;
169 clock_Add(&cv, &next);
170 rx_pthread_next_event_time.tv_sec = cv.sec;
171 rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
172 rx_pthread_n_event_waits++;
173 if (pthread_cond_timedwait
174 (&rx_event_handler_cond,
175 &event_handler_mutex,
176 &rx_pthread_next_event_time) == -1) {
178 assert(errno == EAGAIN);
180 rx_pthread_n_event_expired++;
182 rx_pthread_n_event_woken++;
184 rx_pthread_event_rescheduled = 0;
190 * This routine will get called by the event package whenever a new,
191 * earlier than others, event is posted. */
192 void rxi_ReScheduleEvents() {
193 assert(pthread_mutex_lock(&event_handler_mutex)==0);
194 pthread_cond_signal(&rx_event_handler_cond);
195 rx_pthread_event_rescheduled = 1;
196 assert(pthread_mutex_unlock(&event_handler_mutex)==0);
200 /* Loop to listen on a socket. Return setting *newcallp if this
201 * thread should become a server thread. */
202 static void rxi_ListenerProc(sock, tnop, newcallp)
205 struct rx_call **newcallp;
209 register struct rx_packet *p = (struct rx_packet *)0;
211 assert(pthread_mutex_lock(&listener_mutex)==0);
212 while (!listeners_started) {
213 assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0);
215 assert(pthread_mutex_unlock(&listener_mutex)==0);
219 * Grab a new packet only if necessary (otherwise re-use the old one)
222 rxi_RestoreDataBufs(p);
225 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
226 /* Could this happen with multiple socket listeners? */
227 printf("rxi_Listener: no packets!"); /* Shouldn't happen */
232 if (rxi_ReadPacket(sock, p, &host, &port)) {
234 p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
235 if (newcallp && *newcallp) {
245 /* This is the listener process request loop. The listener process loop
246 * becomes a server thread when rxi_ListenerProc returns, and stays
247 * server thread until rxi_ServerProc returns. */
248 static void rx_ListenerProc(void *argp)
251 int sock = (int) argp;
252 struct rx_call *newcall;
257 rxi_ListenerProc(sock, &threadID, &newcall);
258 /* assert(threadID != -1); */
259 /* assert(newcall != NULL); */
260 sock = OSI_NULLSOCKET;
261 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
262 rxi_ServerProc(threadID, newcall, &sock);
263 /* assert(sock != OSI_NULLSOCKET); */
268 /* This is the server process request loop. The server process loop
269 * becomes a listener thread when rxi_ServerProc returns, and stays
270 * listener thread until rxi_ListenerProc returns. */
275 struct rx_call *newcall = NULL;
277 rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
278 MUTEX_ENTER(&rx_stats_mutex);
279 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
280 /* threadID is used for making decisions in GetCall. Get it by bumping
281 * number of threads handling incoming calls */
282 threadID = rxi_availProcs++;
283 MUTEX_EXIT(&rx_stats_mutex);
286 sock = OSI_NULLSOCKET;
287 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
288 rxi_ServerProc(threadID, newcall, &sock);
289 /* assert(sock != OSI_NULLSOCKET); */
291 rxi_ListenerProc(sock, &threadID, &newcall);
292 /* assert(threadID != -1); */
293 /* assert(newcall != NULL); */
299 * Historically used to start the listener process. We now have multiple
300 * listener processes (one for each socket); these are started by GetUdpSocket.
302 * The event handling process *is* started here (the old listener used
303 * to also handle events). The listener threads can't actually start
304 * listening until rxi_StartListener is called because most of R may not
305 * be initialized when rxi_Listen is called.
307 void rxi_StartListener() {
308 pthread_attr_t tattr;
311 if (pthread_attr_init
313 printf("Unable to create Rx event handling thread (pthread_attr_init)\n");
317 if (pthread_attr_setdetachstate
319 PTHREAD_CREATE_DETACHED) != 0) {
320 printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
326 (&event_handler_thread,
330 printf("Unable to create Rx event handling thread\n");
333 AFS_SIGSET_RESTORE();
335 assert(pthread_mutex_lock(&listener_mutex)==0);
336 assert(pthread_cond_broadcast(&rx_listener_cond)==0);
337 listeners_started = 1;
338 assert(pthread_mutex_unlock(&listener_mutex)==0);
343 * Listen on the specified socket.
349 pthread_attr_t tattr;
352 if (pthread_attr_init
354 printf("Unable to create socket listener thread (pthread_attr_init)\n");
358 if (pthread_attr_setdetachstate
360 PTHREAD_CREATE_DETACHED) != 0) {
361 printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
370 (void *) sock) != 0) {
371 printf("Unable to create socket listener thread\n");
374 AFS_SIGSET_RESTORE();
385 struct msghdr *msg_p,
389 ret = recvmsg(socket, msg_p, flags);
396 rxi_Sendmsg(socket, msg_p, flags)
398 struct msghdr *msg_p;
402 ret = sendmsg(socket, msg_p, flags);
403 #ifdef AFS_LINUX22_ENV
404 /* linux unfortunately returns ECONNREFUSED if the target port
405 * is no longer in use */
406 /* and EAGAIN if a UDP checksum is incorrect */
407 if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
411 printf("rxi_sendmsg failed, error %d\n", errno);