2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
27 #include "rx_globals.h"
28 #include "rx_pthread.h"
30 #include "rx_atomic.h"
31 #include "rx_internal.h"
32 #include "rx_pthread.h"
34 #include "rx_xmit_nt.h"
37 static void rxi_SetThreadNum(int threadID);
39 /* Set rx_pthread_event_rescheduled if event_handler should just try
40 * again instead of sleeping.
42 * Protected by event_handler_mutex
44 static int rx_pthread_event_rescheduled = 0;
46 static void *rx_ListenerProc(void *);
49 * We supply an event handling thread for Rx's event processing.
50 * The condition variable is used to wakeup the thread whenever a new
51 * event is scheduled earlier than the previous earliest event.
52 * This thread is also responsible for keeping time.
54 static pthread_t event_handler_thread;
55 afs_kcondvar_t rx_event_handler_cond;
56 afs_kmutex_t event_handler_mutex;
57 afs_kcondvar_t rx_listener_cond;
58 afs_kmutex_t listener_mutex;
59 static int listeners_started = 0;
60 afs_kmutex_t rx_clock_mutex;
61 struct clock rxi_clockNow;
63 static rx_atomic_t threadHiNum;
66 rx_NewThreadId(void) {
67 return rx_atomic_inc_and_read(&threadHiNum);
71 * Delay the current thread the specified number of seconds.
80 * Called from rx_Init()
83 rxi_InitializeThreadSupport(void)
85 /* listeners_started must only be reset if
86 * the listener thread terminates */
87 /* listeners_started = 0; */
88 clock_GetTime(&rxi_clockNow);
92 server_entry(void *argp)
94 void (*server_proc) (void *) = (void (*)(void *))argp;
96 dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
97 return (void *) -1; /* reused as return value, see pthread(3) */
101 * Start an Rx server process.
104 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
107 pthread_attr_t tattr;
110 if (pthread_attr_init(&tattr) != 0) {
111 osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
114 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
115 osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
119 * NOTE: We are ignoring the stack size parameter, for now.
122 if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
123 osi_Panic("Unable to Create Rx server thread\n");
125 AFS_SIGSET_RESTORE();
129 * The event handling process.
132 event_handler(void *argp)
134 unsigned long rx_pthread_n_event_expired = 0;
135 unsigned long rx_pthread_n_event_waits = 0;
136 long rx_pthread_n_event_woken = 0;
137 unsigned long rx_pthread_n_event_error = 0;
138 struct timespec rx_pthread_next_event_time = { 0, 0 };
141 MUTEX_ENTER(&event_handler_mutex);
147 MUTEX_EXIT(&event_handler_mutex);
149 next.sec = 30; /* Time to sleep if there are no events scheduled */
152 rxevent_RaiseEvents(&next);
154 MUTEX_ENTER(&event_handler_mutex);
155 if (rx_pthread_event_rescheduled) {
156 rx_pthread_event_rescheduled = 0;
160 clock_Add(&cv, &next);
161 rx_pthread_next_event_time.tv_sec = cv.sec;
162 rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
163 rx_pthread_n_event_waits++;
164 error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
166 rx_pthread_n_event_woken++;
169 else if (error == ETIMEDOUT) {
170 rx_pthread_n_event_expired++;
172 rx_pthread_n_event_error++;
175 else if (errno == ETIMEDOUT) {
176 rx_pthread_n_event_expired++;
178 rx_pthread_n_event_error++;
181 rx_pthread_event_rescheduled = 0;
188 * This routine will get called by the event package whenever a new,
189 * earlier than others, event is posted. */
191 rxi_ReScheduleEvents(void)
193 MUTEX_ENTER(&event_handler_mutex);
194 CV_SIGNAL(&rx_event_handler_cond);
195 rx_pthread_event_rescheduled = 1;
196 MUTEX_EXIT(&event_handler_mutex);
200 /* Loop to listen on a socket. Return setting *newcallp if this
201 * thread should become a server thread. */
203 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
207 struct rx_packet *p = (struct rx_packet *)0;
209 MUTEX_ENTER(&listener_mutex);
210 while (!listeners_started) {
211 CV_WAIT(&rx_listener_cond, &listener_mutex);
213 MUTEX_EXIT(&listener_mutex);
216 /* See if a check for additional packets was issued */
220 * Grab a new packet only if necessary (otherwise re-use the old one)
223 rxi_RestoreDataBufs(p);
225 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
226 /* Could this happen with multiple socket listeners? */
227 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
231 if (rxi_ReadPacket(sock, p, &host, &port)) {
233 p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
234 if (newcallp && *newcallp) {
244 /* This is the listener process request loop. The listener process loop
245 * becomes a server thread when rxi_ListenerProc returns, and stays
246 * server thread until rxi_ServerProc returns. */
248 rx_ListenerProc(void *argp)
251 osi_socket sock = (osi_socket)(intptr_t)argp;
252 struct rx_call *newcall;
257 rxi_ListenerProc(sock, &threadID, &newcall);
258 /* osi_Assert(threadID != -1); */
259 /* osi_Assert(newcall != NULL); */
260 sock = OSI_NULLSOCKET;
261 rxi_SetThreadNum(threadID);
262 rxi_ServerProc(threadID, newcall, &sock);
263 /* osi_Assert(sock != OSI_NULLSOCKET); */
269 /* This is the server process request loop. The server process loop
270 * becomes a listener thread when rxi_ServerProc returns, and stays
271 * listener thread until rxi_ListenerProc returns. */
273 rx_ServerProc(void * dummy)
277 struct rx_call *newcall = NULL;
279 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
280 MUTEX_ENTER(&rx_quota_mutex);
281 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
282 /* threadID is used for making decisions in GetCall. Get it by bumping
283 * number of threads handling incoming calls */
284 /* Unique thread ID: used for scheduling purposes *and* as index into
285 * the host hold table (fileserver).
286 * The previously used rxi_availProcs is unsuitable as it
287 * will already go up and down as packets arrive while the server
288 * threads are still initialising! The recently introduced
289 * rxi_pthread_hinum does not necessarily lead to a server
290 * thread with id 0, which is not allowed to hop through the
291 * incoming call queue.
292 * So either introduce yet another counter or flag the FCFS
293 * thread... chose the latter.
295 MUTEX_ENTER(&rx_pthread_mutex);
296 threadID = rx_NewThreadId();
297 if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
298 rxi_fcfs_thread_num = threadID;
299 MUTEX_EXIT(&rx_pthread_mutex);
301 MUTEX_EXIT(&rx_quota_mutex);
304 sock = OSI_NULLSOCKET;
305 rxi_SetThreadNum(threadID);
306 rxi_ServerProc(threadID, newcall, &sock);
307 /* osi_Assert(sock != OSI_NULLSOCKET); */
309 rxi_ListenerProc(sock, &threadID, &newcall);
310 /* osi_Assert(threadID != -1); */
311 /* osi_Assert(newcall != NULL); */
318 * Historically used to start the listener process. We now have multiple
319 * listener processes (one for each socket); these are started by GetUdpSocket.
321 * The event handling process *is* started here (the old listener used
322 * to also handle events). The listener threads can't actually start
323 * listening until rxi_StartListener is called because most of R may not
324 * be initialized when rxi_Listen is called.
327 rxi_StartListener(void)
329 pthread_attr_t tattr;
332 if (listeners_started)
335 if (pthread_attr_init(&tattr) != 0) {
336 osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
339 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
340 osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
344 if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
346 osi_Panic("Unable to create Rx event handling thread\n");
349 AFS_SIGSET_RESTORE();
351 MUTEX_ENTER(&listener_mutex);
352 CV_BROADCAST(&rx_listener_cond);
353 listeners_started = 1;
354 MUTEX_EXIT(&listener_mutex);
359 * Listen on the specified socket.
362 rxi_Listen(osi_socket sock)
365 pthread_attr_t tattr;
368 if (pthread_attr_init(&tattr) != 0) {
369 osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
372 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
373 osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
377 if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
378 osi_Panic("Unable to create socket listener thread\n");
381 AFS_SIGSET_RESTORE();
391 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
394 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
395 while((rxi_HandleSocketError(socket)) > 0)
398 ret = recvmsg(socket, msg_p, flags);
406 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
409 ret = sendmsg(socket, msg_p, flags);
410 #ifdef AFS_LINUX22_ENV
411 /* linux unfortunately returns ECONNREFUSED if the target port
412 * is no longer in use */
413 /* and EAGAIN if a UDP checksum is incorrect */
414 if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
418 dpf(("rxi_sendmsg failed, error %d\n", errno));
424 if (WSAGetLastError() > 0)
425 return -WSAGetLastError();
432 struct rx_ts_info_t * rx_ts_info_init(void) {
433 struct rx_ts_info_t * rx_ts_info;
434 rx_ts_info = calloc(1, sizeof(rx_ts_info_t));
435 osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
436 #ifdef RX_ENABLE_TSFPQ
437 queue_Init(&rx_ts_info->_FPQ);
439 MUTEX_ENTER(&rx_packets_mutex);
441 RX_TS_FPQ_COMPUTE_LIMITS;
442 MUTEX_EXIT(&rx_packets_mutex);
443 #endif /* RX_ENABLE_TSFPQ */
448 rx_GetThreadNum(void) {
449 return (intptr_t)pthread_getspecific(rx_thread_id_key);
453 rxi_SetThreadNum(int threadID) {
454 osi_Assert(pthread_setspecific(rx_thread_id_key,
455 (void *)(intptr_t)threadID) == 0);
459 rx_SetThreadNum(void) {
462 threadId = rx_NewThreadId();
463 rxi_SetThreadNum(threadId);