2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
24 #include <sys/types.h>
28 # include <sys/socket.h>
29 # include <sys/file.h>
31 # include <netinet/in.h>
33 # include <sys/ioctl.h>
34 # include <sys/time.h>
38 #include <rx/rx_globals.h>
40 #include <rx/rx_pthread.h>
41 #include <rx/rx_clock.h>
44 * Number of times the event handling thread was signalled because a new
45 * event was scheduled earlier than the lastest event.
47 * Protected by event_handler_mutex
49 static long rx_pthread_n_event_wakeups;
51 /* Set rx_pthread_event_rescheduled if event_handler should just try
52 * again instead of sleeping.
54 * Protected by event_handler_mutex
56 static int rx_pthread_event_rescheduled = 0;
58 static void *rx_ListenerProc(void *);
61 * We supply an event handling thread for Rx's event processing.
62 * The condition variable is used to wakeup the thread whenever a new
63 * event is scheduled earlier than the previous earliest event.
64 * This thread is also responsible for keeping time.
66 static pthread_t event_handler_thread;
67 pthread_cond_t rx_event_handler_cond;
68 pthread_mutex_t event_handler_mutex;
69 pthread_cond_t rx_listener_cond;
70 pthread_mutex_t listener_mutex;
71 static int listeners_started = 0;
72 pthread_mutex_t rx_clock_mutex;
73 struct clock rxi_clockNow;
76 * Delay the current thread the specified number of seconds.
85 * Called from rx_Init()
88 rxi_InitializeThreadSupport(void)
90 listeners_started = 0;
91 clock_GetTime(&rxi_clockNow);
95 server_entry(void *argp)
97 void (*server_proc) () = (void (*)())argp;
99 printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
105 * Start an Rx server process.
108 rxi_StartServerProc(void (*proc) (void), int stacksize)
111 pthread_attr_t tattr;
114 if (pthread_attr_init(&tattr) != 0) {
115 printf("Unable to Create Rx server thread (pthread_attr_init)\n");
119 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
121 ("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
126 * NOTE: We are ignoring the stack size parameter, for now.
129 if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
130 printf("Unable to Create Rx server thread\n");
133 AFS_SIGSET_RESTORE();
137 * The event handling process.
140 event_handler(void *argp)
142 struct clock rx_pthread_last_event_wait_time = { 0, 0 };
143 unsigned long rx_pthread_n_event_expired = 0;
144 unsigned long rx_pthread_n_event_waits = 0;
145 long rx_pthread_n_event_woken = 0;
146 struct timespec rx_pthread_next_event_time = { 0, 0 };
148 assert(pthread_mutex_lock(&event_handler_mutex) == 0);
154 assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
156 next.sec = 30; /* Time to sleep if there are no events scheduled */
159 rxevent_RaiseEvents(&next);
161 assert(pthread_mutex_lock(&event_handler_mutex) == 0);
162 if (rx_pthread_event_rescheduled) {
163 rx_pthread_event_rescheduled = 0;
167 clock_Add(&cv, &next);
168 rx_pthread_next_event_time.tv_sec = cv.sec;
169 rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
170 rx_pthread_n_event_waits++;
171 if (pthread_cond_timedwait
172 (&rx_event_handler_cond, &event_handler_mutex,
173 &rx_pthread_next_event_time) == -1) {
175 assert(errno == EAGAIN);
177 rx_pthread_n_event_expired++;
179 rx_pthread_n_event_woken++;
181 rx_pthread_event_rescheduled = 0;
187 * This routine will get called by the event package whenever a new,
188 * earlier than others, event is posted. */
190 rxi_ReScheduleEvents(void)
192 assert(pthread_mutex_lock(&event_handler_mutex) == 0);
193 pthread_cond_signal(&rx_event_handler_cond);
194 rx_pthread_event_rescheduled = 1;
195 assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
199 /* Loop to listen on a socket. Return setting *newcallp if this
200 * thread should become a server thread. */
202 rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
206 register struct rx_packet *p = (struct rx_packet *)0;
208 assert(pthread_mutex_lock(&listener_mutex) == 0);
209 while (!listeners_started) {
210 assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
212 assert(pthread_mutex_unlock(&listener_mutex) == 0);
216 * Grab a new packet only if necessary (otherwise re-use the old one)
219 rxi_RestoreDataBufs(p);
221 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
222 /* Could this happen with multiple socket listeners? */
223 printf("rxi_Listener: no packets!"); /* Shouldn't happen */
228 if (rxi_ReadPacket(sock, p, &host, &port)) {
230 p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
231 if (newcallp && *newcallp) {
241 /* This is the listener process request loop. The listener process loop
242 * becomes a server thread when rxi_ListenerProc returns, and stays
243 * server thread until rxi_ServerProc returns. */
245 rx_ListenerProc(void *argp)
248 int sock = (int)argp;
249 struct rx_call *newcall;
254 rxi_ListenerProc(sock, &threadID, &newcall);
255 /* assert(threadID != -1); */
256 /* assert(newcall != NULL); */
257 sock = OSI_NULLSOCKET;
258 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
259 rxi_ServerProc(threadID, newcall, &sock);
260 /* assert(sock != OSI_NULLSOCKET); */
265 /* This is the server process request loop. The server process loop
266 * becomes a listener thread when rxi_ServerProc returns, and stays
267 * listener thread until rxi_ListenerProc returns. */
273 struct rx_call *newcall = NULL;
275 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
276 MUTEX_ENTER(&rx_stats_mutex);
277 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
278 /* threadID is used for making decisions in GetCall. Get it by bumping
279 * number of threads handling incoming calls */
280 /* Unique thread ID: used for scheduling purposes *and* as index into
281 * the host hold table (fileserver).
282 * The previously used rxi_availProcs is unsuitable as it
283 * will already go up and down as packets arrive while the server
284 * threads are still initialising! The recently introduced
285 * rxi_pthread_hinum does not necessarily lead to a server
286 * thread with id 0, which is not allowed to hop through the
287 * incoming call queue.
288 * So either introduce yet another counter or flag the FCFS
289 * thread... chose the latter.
291 threadID = ++rxi_pthread_hinum;
292 if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
293 rxi_fcfs_thread_num = threadID;
295 MUTEX_EXIT(&rx_stats_mutex);
298 sock = OSI_NULLSOCKET;
299 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
300 rxi_ServerProc(threadID, newcall, &sock);
301 /* assert(sock != OSI_NULLSOCKET); */
303 rxi_ListenerProc(sock, &threadID, &newcall);
304 /* assert(threadID != -1); */
305 /* assert(newcall != NULL); */
311 * Historically used to start the listener process. We now have multiple
312 * listener processes (one for each socket); these are started by GetUdpSocket.
314 * The event handling process *is* started here (the old listener used
315 * to also handle events). The listener threads can't actually start
316 * listening until rxi_StartListener is called because most of R may not
317 * be initialized when rxi_Listen is called.
320 rxi_StartListener(void)
322 pthread_attr_t tattr;
325 if (pthread_attr_init(&tattr) != 0) {
327 ("Unable to create Rx event handling thread (pthread_attr_init)\n");
331 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
333 ("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
338 if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
340 printf("Unable to create Rx event handling thread\n");
343 MUTEX_ENTER(&rx_stats_mutex);
345 MUTEX_EXIT(&rx_stats_mutex);
346 AFS_SIGSET_RESTORE();
348 assert(pthread_mutex_lock(&listener_mutex) == 0);
349 assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
350 listeners_started = 1;
351 assert(pthread_mutex_unlock(&listener_mutex) == 0);
356 * Listen on the specified socket.
359 rxi_Listen(osi_socket sock)
362 pthread_attr_t tattr;
365 if (pthread_attr_init(&tattr) != 0) {
367 ("Unable to create socket listener thread (pthread_attr_init)\n");
371 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
373 ("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
378 if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
379 printf("Unable to create socket listener thread\n");
382 MUTEX_ENTER(&rx_stats_mutex);
384 MUTEX_EXIT(&rx_stats_mutex);
385 AFS_SIGSET_RESTORE();
395 rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
398 ret = recvmsg(socket, msg_p, flags);
406 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
409 ret = sendmsg(socket, msg_p, flags);
410 #ifdef AFS_LINUX22_ENV
411 /* linux unfortunately returns ECONNREFUSED if the target port
412 * is no longer in use */
413 /* and EAGAIN if a UDP checksum is incorrect */
414 if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
418 printf("rxi_sendmsg failed, error %d\n", errno);