2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
22 #include <sys/types.h>
27 # include <sys/socket.h>
28 # include <sys/file.h>
30 # include <netinet/in.h>
32 # include <sys/ioctl.h>
33 # include <sys/time.h>
37 #include <rx/rx_globals.h>
39 #include <rx/rx_pthread.h>
40 #include <rx/rx_clock.h>
43 * Number of times the event handling thread was signalled because a new
44 * event was scheduled earlier than the lastest event.
46 * Protected by event_handler_mutex
48 static long rx_pthread_n_event_wakeups;
50 /* Set rx_pthread_event_rescheduled if event_handler should just try
51 * again instead of sleeping.
53 * Protected by event_handler_mutex
55 static int rx_pthread_event_rescheduled = 0;
57 static void *rx_ListenerProc(void *);
60 * We supply an event handling thread for Rx's event processing.
61 * The condition variable is used to wakeup the thread whenever a new
62 * event is scheduled earlier than the previous earliest event.
63 * This thread is also responsible for keeping time.
65 static pthread_t event_handler_thread;
66 afs_kcondvar_t rx_event_handler_cond;
67 afs_kmutex_t event_handler_mutex;
68 afs_kcondvar_t rx_listener_cond;
69 afs_kmutex_t listener_mutex;
70 static int listeners_started = 0;
71 afs_kmutex_t rx_clock_mutex;
72 struct clock rxi_clockNow;
75 * Delay the current thread the specified number of seconds.
84 * Called from rx_Init()
87 rxi_InitializeThreadSupport(void)
89 /* listeners_started must only be reset if
90 * the listener thread terminates */
91 /* listeners_started = 0; */
92 clock_GetTime(&rxi_clockNow);
96 server_entry(void *argp)
98 void (*server_proc) (void *) = (void (*)(void *))argp;
100 dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
106 * Start an Rx server process.
109 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
112 pthread_attr_t tattr;
115 if (pthread_attr_init(&tattr) != 0) {
116 dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
120 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
122 (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
127 * NOTE: We are ignoring the stack size parameter, for now.
130 if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
131 dpf(("Unable to Create Rx server thread\n"));
134 AFS_SIGSET_RESTORE();
138 * The event handling process.
141 event_handler(void *argp)
143 struct clock rx_pthread_last_event_wait_time = { 0, 0 };
144 unsigned long rx_pthread_n_event_expired = 0;
145 unsigned long rx_pthread_n_event_waits = 0;
146 long rx_pthread_n_event_woken = 0;
147 unsigned long rx_pthread_n_event_error = 0;
148 struct timespec rx_pthread_next_event_time = { 0, 0 };
151 MUTEX_ENTER(&event_handler_mutex);
157 MUTEX_EXIT(&event_handler_mutex);
159 next.sec = 30; /* Time to sleep if there are no events scheduled */
162 rxevent_RaiseEvents(&next);
164 MUTEX_ENTER(&event_handler_mutex);
165 if (rx_pthread_event_rescheduled) {
166 rx_pthread_event_rescheduled = 0;
170 clock_Add(&cv, &next);
171 rx_pthread_next_event_time.tv_sec = cv.sec;
172 rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
173 rx_pthread_n_event_waits++;
174 error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
176 rx_pthread_n_event_woken++;
179 else if (error == ETIMEDOUT) {
180 rx_pthread_n_event_expired++;
182 rx_pthread_n_event_error++;
185 else if (errno == ETIMEDOUT) {
186 rx_pthread_n_event_expired++;
188 rx_pthread_n_event_error++;
191 rx_pthread_event_rescheduled = 0;
198 * This routine will get called by the event package whenever a new,
199 * earlier than others, event is posted. */
201 rxi_ReScheduleEvents(void)
203 MUTEX_ENTER(&event_handler_mutex);
204 CV_SIGNAL(&rx_event_handler_cond);
205 rx_pthread_event_rescheduled = 1;
206 MUTEX_EXIT(&event_handler_mutex);
210 /* Loop to listen on a socket. Return setting *newcallp if this
211 * thread should become a server thread. */
213 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
217 struct rx_packet *p = (struct rx_packet *)0;
219 MUTEX_ENTER(&listener_mutex);
220 while (!listeners_started) {
221 CV_WAIT(&rx_listener_cond, &listener_mutex);
223 MUTEX_EXIT(&listener_mutex);
227 * Grab a new packet only if necessary (otherwise re-use the old one)
230 rxi_RestoreDataBufs(p);
232 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
233 /* Could this happen with multiple socket listeners? */
234 dpf(("rxi_Listener: no packets!")); /* Shouldn't happen */
239 if (rxi_ReadPacket(sock, p, &host, &port)) {
241 p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
242 if (newcallp && *newcallp) {
252 /* This is the listener process request loop. The listener process loop
253 * becomes a server thread when rxi_ListenerProc returns, and stays
254 * server thread until rxi_ServerProc returns. */
256 rx_ListenerProc(void *argp)
259 osi_socket sock = (osi_socket)argp;
260 struct rx_call *newcall;
265 rxi_ListenerProc(sock, &threadID, &newcall);
266 /* assert(threadID != -1); */
267 /* assert(newcall != NULL); */
268 sock = OSI_NULLSOCKET;
269 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
270 rxi_ServerProc(threadID, newcall, &sock);
271 /* assert(sock != OSI_NULLSOCKET); */
277 /* This is the server process request loop. The server process loop
278 * becomes a listener thread when rxi_ServerProc returns, and stays
279 * listener thread until rxi_ListenerProc returns. */
281 rx_ServerProc(void * dummy)
285 struct rx_call *newcall = NULL;
287 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
288 MUTEX_ENTER(&rx_quota_mutex);
289 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
290 /* threadID is used for making decisions in GetCall. Get it by bumping
291 * number of threads handling incoming calls */
292 /* Unique thread ID: used for scheduling purposes *and* as index into
293 * the host hold table (fileserver).
294 * The previously used rxi_availProcs is unsuitable as it
295 * will already go up and down as packets arrive while the server
296 * threads are still initialising! The recently introduced
297 * rxi_pthread_hinum does not necessarily lead to a server
298 * thread with id 0, which is not allowed to hop through the
299 * incoming call queue.
300 * So either introduce yet another counter or flag the FCFS
301 * thread... chose the latter.
303 MUTEX_ENTER(&rx_pthread_mutex);
304 threadID = ++rxi_pthread_hinum;
305 if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
306 rxi_fcfs_thread_num = threadID;
307 MUTEX_EXIT(&rx_pthread_mutex);
309 MUTEX_EXIT(&rx_quota_mutex);
312 sock = OSI_NULLSOCKET;
313 assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
314 rxi_ServerProc(threadID, newcall, &sock);
315 /* assert(sock != OSI_NULLSOCKET); */
317 rxi_ListenerProc(sock, &threadID, &newcall);
318 /* assert(threadID != -1); */
319 /* assert(newcall != NULL); */
326 * Historically used to start the listener process. We now have multiple
327 * listener processes (one for each socket); these are started by GetUdpSocket.
329 * The event handling process *is* started here (the old listener used
330 * to also handle events). The listener threads can't actually start
331 * listening until rxi_StartListener is called because most of R may not
332 * be initialized when rxi_Listen is called.
335 rxi_StartListener(void)
337 pthread_attr_t tattr;
340 if (listeners_started)
343 if (pthread_attr_init(&tattr) != 0) {
345 (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
349 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
351 (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
356 if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
358 dpf(("Unable to create Rx event handling thread\n"));
361 MUTEX_ENTER(&rx_pthread_mutex);
363 MUTEX_EXIT(&rx_pthread_mutex);
364 AFS_SIGSET_RESTORE();
366 MUTEX_ENTER(&listener_mutex);
367 CV_BROADCAST(&rx_listener_cond);
368 listeners_started = 1;
369 MUTEX_EXIT(&listener_mutex);
374 * Listen on the specified socket.
377 rxi_Listen(osi_socket sock)
380 pthread_attr_t tattr;
383 if (pthread_attr_init(&tattr) != 0) {
385 (("Unable to create socket listener thread (pthread_attr_init)\n"));
389 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
391 (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
396 if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
397 dpf(("Unable to create socket listener thread\n"));
400 MUTEX_ENTER(&rx_pthread_mutex);
402 MUTEX_EXIT(&rx_pthread_mutex);
403 AFS_SIGSET_RESTORE();
413 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
416 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
417 while((rxi_HandleSocketError(socket)) > 0)
420 ret = recvmsg(socket, msg_p, flags);
428 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
431 ret = sendmsg(socket, msg_p, flags);
432 #ifdef AFS_LINUX22_ENV
433 /* linux unfortunately returns ECONNREFUSED if the target port
434 * is no longer in use */
435 /* and EAGAIN if a UDP checksum is incorrect */
436 if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
440 dpf(("rxi_sendmsg failed, error %d\n", errno));
447 struct rx_ts_info_t * rx_ts_info_init(void) {
448 struct rx_ts_info_t * rx_ts_info;
449 rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
450 assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
451 memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
452 #ifdef RX_ENABLE_TSFPQ
453 queue_Init(&rx_ts_info->_FPQ);
455 MUTEX_ENTER(&rx_packets_mutex);
457 RX_TS_FPQ_COMPUTE_LIMITS;
458 MUTEX_EXIT(&rx_packets_mutex);
459 #endif /* RX_ENABLE_TSFPQ */