2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
27 #include "rx_globals.h"
28 #include "rx_pthread.h"
30 #include "rx_atomic.h"
32 #include "rx_xmit_nt.h"
35 static void rxi_SetThreadNum(int threadID);
37 /* Set rx_pthread_event_rescheduled if event_handler should just try
38 * again instead of sleeping.
40 * Protected by event_handler_mutex
42 static int rx_pthread_event_rescheduled = 0;
44 static void *rx_ListenerProc(void *);
47 * We supply an event handling thread for Rx's event processing.
48 * The condition variable is used to wakeup the thread whenever a new
49 * event is scheduled earlier than the previous earliest event.
50 * This thread is also responsible for keeping time.
52 static pthread_t event_handler_thread;
53 afs_kcondvar_t rx_event_handler_cond;
54 afs_kmutex_t event_handler_mutex;
55 afs_kcondvar_t rx_listener_cond;
56 afs_kmutex_t listener_mutex;
57 static int listeners_started = 0;
58 afs_kmutex_t rx_clock_mutex;
59 struct clock rxi_clockNow;
61 static rx_atomic_t threadHiNum;
64 rx_NewThreadId(void) {
65 return rx_atomic_inc_and_read(&threadHiNum);
69 * Delay the current thread the specified number of seconds.
78 * Called from rx_Init()
81 rxi_InitializeThreadSupport(void)
83 /* listeners_started must only be reset if
84 * the listener thread terminates */
85 /* listeners_started = 0; */
86 clock_GetTime(&rxi_clockNow);
90 server_entry(void *argp)
92 void (*server_proc) (void *) = (void (*)(void *))argp;
94 dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
95 return (void *) -1; /* reused as return value, see pthread(3) */
99 * Start an Rx server process.
102 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
105 pthread_attr_t tattr;
108 if (pthread_attr_init(&tattr) != 0) {
109 osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
112 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
113 osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
117 * NOTE: We are ignoring the stack size parameter, for now.
120 if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
121 osi_Panic("Unable to Create Rx server thread\n");
123 AFS_SIGSET_RESTORE();
127 * The event handling process.
130 event_handler(void *argp)
132 unsigned long rx_pthread_n_event_expired = 0;
133 unsigned long rx_pthread_n_event_waits = 0;
134 long rx_pthread_n_event_woken = 0;
135 unsigned long rx_pthread_n_event_error = 0;
136 struct timespec rx_pthread_next_event_time = { 0, 0 };
139 MUTEX_ENTER(&event_handler_mutex);
145 MUTEX_EXIT(&event_handler_mutex);
147 next.sec = 30; /* Time to sleep if there are no events scheduled */
150 rxevent_RaiseEvents(&next);
152 MUTEX_ENTER(&event_handler_mutex);
153 if (rx_pthread_event_rescheduled) {
154 rx_pthread_event_rescheduled = 0;
158 clock_Add(&cv, &next);
159 rx_pthread_next_event_time.tv_sec = cv.sec;
160 rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
161 rx_pthread_n_event_waits++;
162 error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
164 rx_pthread_n_event_woken++;
167 else if (error == ETIMEDOUT) {
168 rx_pthread_n_event_expired++;
170 rx_pthread_n_event_error++;
173 else if (errno == ETIMEDOUT) {
174 rx_pthread_n_event_expired++;
176 rx_pthread_n_event_error++;
179 rx_pthread_event_rescheduled = 0;
186 * This routine will get called by the event package whenever a new,
187 * earlier than others, event is posted. */
189 rxi_ReScheduleEvents(void)
191 MUTEX_ENTER(&event_handler_mutex);
192 CV_SIGNAL(&rx_event_handler_cond);
193 rx_pthread_event_rescheduled = 1;
194 MUTEX_EXIT(&event_handler_mutex);
198 /* Loop to listen on a socket. Return setting *newcallp if this
199 * thread should become a server thread. */
201 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
205 struct rx_packet *p = (struct rx_packet *)0;
207 MUTEX_ENTER(&listener_mutex);
208 while (!listeners_started) {
209 CV_WAIT(&rx_listener_cond, &listener_mutex);
211 MUTEX_EXIT(&listener_mutex);
214 /* See if a check for additional packets was issued */
218 * Grab a new packet only if necessary (otherwise re-use the old one)
221 rxi_RestoreDataBufs(p);
223 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
224 /* Could this happen with multiple socket listeners? */
225 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
229 if (rxi_ReadPacket(sock, p, &host, &port)) {
231 p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
232 if (newcallp && *newcallp) {
242 /* This is the listener process request loop. The listener process loop
243 * becomes a server thread when rxi_ListenerProc returns, and stays
244 * server thread until rxi_ServerProc returns. */
246 rx_ListenerProc(void *argp)
249 osi_socket sock = (osi_socket)(intptr_t)argp;
250 struct rx_call *newcall;
255 rxi_ListenerProc(sock, &threadID, &newcall);
256 /* osi_Assert(threadID != -1); */
257 /* osi_Assert(newcall != NULL); */
258 sock = OSI_NULLSOCKET;
259 rxi_SetThreadNum(threadID);
260 rxi_ServerProc(threadID, newcall, &sock);
261 /* osi_Assert(sock != OSI_NULLSOCKET); */
267 /* This is the server process request loop. The server process loop
268 * becomes a listener thread when rxi_ServerProc returns, and stays
269 * listener thread until rxi_ListenerProc returns. */
271 rx_ServerProc(void * dummy)
275 struct rx_call *newcall = NULL;
277 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
278 MUTEX_ENTER(&rx_quota_mutex);
279 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
280 /* threadID is used for making decisions in GetCall. Get it by bumping
281 * number of threads handling incoming calls */
282 /* Unique thread ID: used for scheduling purposes *and* as index into
283 * the host hold table (fileserver).
284 * The previously used rxi_availProcs is unsuitable as it
285 * will already go up and down as packets arrive while the server
286 * threads are still initialising! The recently introduced
287 * rxi_pthread_hinum does not necessarily lead to a server
288 * thread with id 0, which is not allowed to hop through the
289 * incoming call queue.
290 * So either introduce yet another counter or flag the FCFS
291 * thread... chose the latter.
293 MUTEX_ENTER(&rx_pthread_mutex);
294 threadID = rx_NewThreadId();
295 if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
296 rxi_fcfs_thread_num = threadID;
297 MUTEX_EXIT(&rx_pthread_mutex);
299 MUTEX_EXIT(&rx_quota_mutex);
302 sock = OSI_NULLSOCKET;
303 rxi_SetThreadNum(threadID);
304 rxi_ServerProc(threadID, newcall, &sock);
305 /* osi_Assert(sock != OSI_NULLSOCKET); */
307 rxi_ListenerProc(sock, &threadID, &newcall);
308 /* osi_Assert(threadID != -1); */
309 /* osi_Assert(newcall != NULL); */
316 * Historically used to start the listener process. We now have multiple
317 * listener processes (one for each socket); these are started by GetUdpSocket.
319 * The event handling process *is* started here (the old listener used
320 * to also handle events). The listener threads can't actually start
321 * listening until rxi_StartListener is called because most of R may not
322 * be initialized when rxi_Listen is called.
325 rxi_StartListener(void)
327 pthread_attr_t tattr;
330 if (listeners_started)
333 if (pthread_attr_init(&tattr) != 0) {
334 osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
337 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
338 osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
342 if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
344 osi_Panic("Unable to create Rx event handling thread\n");
347 AFS_SIGSET_RESTORE();
349 MUTEX_ENTER(&listener_mutex);
350 CV_BROADCAST(&rx_listener_cond);
351 listeners_started = 1;
352 MUTEX_EXIT(&listener_mutex);
357 * Listen on the specified socket.
360 rxi_Listen(osi_socket sock)
363 pthread_attr_t tattr;
366 if (pthread_attr_init(&tattr) != 0) {
367 osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
370 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
371 osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
375 if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
376 osi_Panic("Unable to create socket listener thread\n");
379 AFS_SIGSET_RESTORE();
389 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
392 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
393 while((rxi_HandleSocketError(socket)) > 0)
396 ret = recvmsg(socket, msg_p, flags);
404 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
407 ret = sendmsg(socket, msg_p, flags);
408 #ifdef AFS_LINUX22_ENV
409 /* linux unfortunately returns ECONNREFUSED if the target port
410 * is no longer in use */
411 /* and EAGAIN if a UDP checksum is incorrect */
412 if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
416 dpf(("rxi_sendmsg failed, error %d\n", errno));
422 if (WSAGetLastError() > 0)
423 return -WSAGetLastError();
430 struct rx_ts_info_t * rx_ts_info_init(void) {
431 struct rx_ts_info_t * rx_ts_info;
432 rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
433 osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
434 memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
435 #ifdef RX_ENABLE_TSFPQ
436 queue_Init(&rx_ts_info->_FPQ);
438 MUTEX_ENTER(&rx_packets_mutex);
440 RX_TS_FPQ_COMPUTE_LIMITS;
441 MUTEX_EXIT(&rx_packets_mutex);
442 #endif /* RX_ENABLE_TSFPQ */
447 rx_GetThreadNum(void) {
448 return (intptr_t)pthread_getspecific(rx_thread_id_key);
452 rxi_SetThreadNum(int threadID) {
453 osi_Assert(pthread_setspecific(rx_thread_id_key,
454 (void *)(intptr_t)threadID) == 0);
458 rx_SetThreadNum(void) {
461 threadId = rx_NewThreadId();
462 rxi_SetThreadNum(threadId);