Rx: Permit MakeDebugCall() be be compiled when RXDEBUG is undefined
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21
22 #include <sys/types.h>
23 #include <errno.h>
24 #include <signal.h>
25 #include <string.h>
26 #ifdef HAVE_STDINT_H
27 # include <stdint.h>
28 #endif
29 #ifndef AFS_NT40_ENV
30 # include <sys/socket.h>
31 # include <sys/file.h>
32 # include <netdb.h>
33 # include <netinet/in.h>
34 # include <net/if.h>
35 # include <sys/ioctl.h>
36 # include <sys/time.h>
37 # include <unistd.h>
38 #endif
39 #include <sys/stat.h>
40 #include <rx/rx.h>
41 #include <rx/rx_globals.h>
42 #include <assert.h>
43 #include <rx/rx_pthread.h>
44 #include <rx/rx_clock.h>
45
46 /* Set rx_pthread_event_rescheduled if event_handler should just try
47  * again instead of sleeping.
48  *
49  * Protected by event_handler_mutex
50  */
51 static int rx_pthread_event_rescheduled = 0;
52
53 static void *rx_ListenerProc(void *);
54
55 /*
56  * We supply an event handling thread for Rx's event processing.
57  * The condition variable is used to wakeup the thread whenever a new
58  * event is scheduled earlier than the previous earliest event.
59  * This thread is also responsible for keeping time.
60  */
61 static pthread_t event_handler_thread;
62 afs_kcondvar_t rx_event_handler_cond;
63 afs_kmutex_t event_handler_mutex;
64 afs_kcondvar_t rx_listener_cond;
65 afs_kmutex_t listener_mutex;
66 static int listeners_started = 0;
67 afs_kmutex_t rx_clock_mutex;
68 struct clock rxi_clockNow;
69
70 /*
71  * Delay the current thread the specified number of seconds.
72  */
73 void
74 rxi_Delay(int sec)
75 {
76     sleep(sec);
77 }
78
79 /*
80  * Called from rx_Init()
81  */
82 void
83 rxi_InitializeThreadSupport(void)
84 {
85         /* listeners_started must only be reset if
86          * the listener thread terminates */
87         /* listeners_started = 0; */
88     clock_GetTime(&rxi_clockNow);
89 }
90
91 static void *
92 server_entry(void *argp)
93 {
94     void (*server_proc) (void *) = (void (*)(void *))argp;
95     server_proc(NULL);
96     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
97     return (void *) -1; /* reused as return value, see pthread(3) */
98 }
99
100 /*
101  * Start an Rx server process.
102  */
103 void
104 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
105 {
106     pthread_t thread;
107     pthread_attr_t tattr;
108     AFS_SIGSET_DECL;
109
110     if (pthread_attr_init(&tattr) != 0) {
111         dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
112         assert(0);
113     }
114
115     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
116         dpf
117             (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
118         assert(0);
119     }
120
121     /*
122      * NOTE: We are ignoring the stack size parameter, for now.
123      */
124     AFS_SIGSET_CLEAR();
125     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
126         dpf(("Unable to Create Rx server thread\n"));
127         assert(0);
128     }
129     AFS_SIGSET_RESTORE();
130 }
131
132 /*
133  * The event handling process.
134  */
135 static void *
136 event_handler(void *argp)
137 {
138     unsigned long rx_pthread_n_event_expired = 0;
139     unsigned long rx_pthread_n_event_waits = 0;
140     long rx_pthread_n_event_woken = 0;
141     unsigned long rx_pthread_n_event_error = 0;
142     struct timespec rx_pthread_next_event_time = { 0, 0 };
143     int error;
144
145     MUTEX_ENTER(&event_handler_mutex);
146
147     for (;;) {
148         struct clock cv;
149         struct clock next;
150
151         MUTEX_EXIT(&event_handler_mutex);
152
153         next.sec = 30;          /* Time to sleep if there are no events scheduled */
154         next.usec = 0;
155         clock_GetTime(&cv);
156         rxevent_RaiseEvents(&next);
157
158         MUTEX_ENTER(&event_handler_mutex);
159         if (rx_pthread_event_rescheduled) {
160             rx_pthread_event_rescheduled = 0;
161             continue;
162         }
163
164         clock_Add(&cv, &next);
165         rx_pthread_next_event_time.tv_sec = cv.sec;
166         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
167         rx_pthread_n_event_waits++;
168         error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
169         if (error == 0) {
170             rx_pthread_n_event_woken++;
171         }
172 #ifdef AFS_NT40_ENV
173         else if (error == ETIMEDOUT) {
174             rx_pthread_n_event_expired++;
175         } else {
176             rx_pthread_n_event_error++;
177         }
178 #else
179         else if (errno == ETIMEDOUT) {
180             rx_pthread_n_event_expired++;
181         } else {
182             rx_pthread_n_event_error++;
183         }
184 #endif
185         rx_pthread_event_rescheduled = 0;
186     }
187     return NULL;
188 }
189
190
191 /*
192  * This routine will get called by the event package whenever a new,
193  * earlier than others, event is posted. */
194 void
195 rxi_ReScheduleEvents(void)
196 {
197     MUTEX_ENTER(&event_handler_mutex);
198     CV_SIGNAL(&rx_event_handler_cond);
199     rx_pthread_event_rescheduled = 1;
200     MUTEX_EXIT(&event_handler_mutex);
201 }
202
203
204 /* Loop to listen on a socket. Return setting *newcallp if this
205  * thread should become a server thread.  */
206 static void
207 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
208 {
209     unsigned int host;
210     u_short port;
211     struct rx_packet *p = (struct rx_packet *)0;
212
213     MUTEX_ENTER(&listener_mutex);
214     while (!listeners_started) {
215         CV_WAIT(&rx_listener_cond, &listener_mutex);
216     }
217     MUTEX_EXIT(&listener_mutex);
218
219     for (;;) {
220         /* See if a check for additional packets was issued */
221         rx_CheckPackets();
222
223         /*
224          * Grab a new packet only if necessary (otherwise re-use the old one)
225          */
226         if (p) {
227             rxi_RestoreDataBufs(p);
228         } else {
229             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
230                 /* Could this happen with multiple socket listeners? */
231                 dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
232                 assert(0);
233             }
234         }
235
236         if (rxi_ReadPacket(sock, p, &host, &port)) {
237             clock_NewTime();
238             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
239             if (newcallp && *newcallp) {
240                 if (p)
241                     rxi_FreePacket(p);
242                 return;
243             }
244         }
245     }
246     /* NOTREACHED */
247 }
248
249 /* This is the listener process request loop. The listener process loop
250  * becomes a server thread when rxi_ListenerProc returns, and stays
251  * server thread until rxi_ServerProc returns. */
252 static void *
253 rx_ListenerProc(void *argp)
254 {
255     int threadID;
256     osi_socket sock = (osi_socket)(intptr_t)argp;
257     struct rx_call *newcall;
258
259     while (1) {
260         newcall = NULL;
261         threadID = -1;
262         rxi_ListenerProc(sock, &threadID, &newcall);
263         /* assert(threadID != -1); */
264         /* assert(newcall != NULL); */
265         sock = OSI_NULLSOCKET;
266         assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
267         rxi_ServerProc(threadID, newcall, &sock);
268         /* assert(sock != OSI_NULLSOCKET); */
269     }
270     /* not reached */
271     return NULL;
272 }
273
274 /* This is the server process request loop. The server process loop
275  * becomes a listener thread when rxi_ServerProc returns, and stays
276  * listener thread until rxi_ListenerProc returns. */
277 void *
278 rx_ServerProc(void * dummy)
279 {
280     osi_socket sock;
281     int threadID;
282     struct rx_call *newcall = NULL;
283
284     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
285     MUTEX_ENTER(&rx_quota_mutex);
286     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
287     /* threadID is used for making decisions in GetCall.  Get it by bumping
288      * number of threads handling incoming calls */
289     /* Unique thread ID: used for scheduling purposes *and* as index into
290      * the host hold table (fileserver).
291      * The previously used rxi_availProcs is unsuitable as it
292      * will already go up and down as packets arrive while the server
293      * threads are still initialising! The recently introduced
294      * rxi_pthread_hinum does not necessarily lead to a server
295      * thread with id 0, which is not allowed to hop through the
296      * incoming call queue.
297      * So either introduce yet another counter or flag the FCFS
298      * thread... chose the latter.
299      */
300     MUTEX_ENTER(&rx_pthread_mutex);
301     threadID = ++rxi_pthread_hinum;
302     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
303         rxi_fcfs_thread_num = threadID;
304     MUTEX_EXIT(&rx_pthread_mutex);
305     ++rxi_availProcs;
306     MUTEX_EXIT(&rx_quota_mutex);
307
308     while (1) {
309         sock = OSI_NULLSOCKET;
310         assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
311         rxi_ServerProc(threadID, newcall, &sock);
312         /* assert(sock != OSI_NULLSOCKET); */
313         newcall = NULL;
314         rxi_ListenerProc(sock, &threadID, &newcall);
315         /* assert(threadID != -1); */
316         /* assert(newcall != NULL); */
317     }
318     /* not reached */
319     return NULL;
320 }
321
322 /*
323  * Historically used to start the listener process. We now have multiple
324  * listener processes (one for each socket); these are started by GetUdpSocket.
325  *
326  * The event handling process *is* started here (the old listener used
327  * to also handle events). The listener threads can't actually start
328  * listening until rxi_StartListener is called because most of R may not
329  * be initialized when rxi_Listen is called.
330  */
331 void
332 rxi_StartListener(void)
333 {
334     pthread_attr_t tattr;
335     AFS_SIGSET_DECL;
336
337         if (listeners_started)
338                 return;
339
340     if (pthread_attr_init(&tattr) != 0) {
341         dpf
342             (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
343         assert(0);
344     }
345
346     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
347         dpf
348             (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
349         assert(0);
350     }
351
352     AFS_SIGSET_CLEAR();
353     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
354         0) {
355         dpf(("Unable to create Rx event handling thread\n"));
356         assert(0);
357     }
358     MUTEX_ENTER(&rx_pthread_mutex);
359     ++rxi_pthread_hinum;
360     MUTEX_EXIT(&rx_pthread_mutex);
361     AFS_SIGSET_RESTORE();
362
363     MUTEX_ENTER(&listener_mutex);
364     CV_BROADCAST(&rx_listener_cond);
365     listeners_started = 1;
366     MUTEX_EXIT(&listener_mutex);
367
368 }
369
370 /*
371  * Listen on the specified socket.
372  */
373 int
374 rxi_Listen(osi_socket sock)
375 {
376     pthread_t thread;
377     pthread_attr_t tattr;
378     AFS_SIGSET_DECL;
379
380     if (pthread_attr_init(&tattr) != 0) {
381         dpf
382             (("Unable to create socket listener thread (pthread_attr_init)\n"));
383         assert(0);
384     }
385
386     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
387         dpf
388             (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
389         assert(0);
390     }
391
392     AFS_SIGSET_CLEAR();
393     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
394         dpf(("Unable to create socket listener thread\n"));
395         assert(0);
396     }
397     MUTEX_ENTER(&rx_pthread_mutex);
398     ++rxi_pthread_hinum;
399     MUTEX_EXIT(&rx_pthread_mutex);
400     AFS_SIGSET_RESTORE();
401     return 0;
402 }
403
404
405 /*
406  * Recvmsg.
407  *
408  */
409 int
410 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
411 {
412     int ret;
413 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
414     while((rxi_HandleSocketError(socket)) > 0)
415       ;
416 #endif
417     ret = recvmsg(socket, msg_p, flags);
418     return ret;
419 }
420
421 /*
422  * Sendmsg.
423  */
424 int
425 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
426 {
427     int ret;
428     ret = sendmsg(socket, msg_p, flags);
429 #ifdef AFS_LINUX22_ENV
430     /* linux unfortunately returns ECONNREFUSED if the target port
431      * is no longer in use */
432     /* and EAGAIN if a UDP checksum is incorrect */
433     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
434 #else
435     if (ret == -1) {
436 #endif
437         dpf(("rxi_sendmsg failed, error %d\n", errno));
438         fflush(stdout);
439 #ifndef AFS_NT40_ENV
440         if (errno > 0)
441           return -errno;
442 #else
443             if (WSAGetLastError() > 0)
444               return -WSAGetLastError();
445 #endif
446         return -1;
447     }
448     return 0;
449 }
450
451 struct rx_ts_info_t * rx_ts_info_init(void) {
452     struct rx_ts_info_t * rx_ts_info;
453     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
454     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
455     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
456 #ifdef RX_ENABLE_TSFPQ
457     queue_Init(&rx_ts_info->_FPQ);
458
459     MUTEX_ENTER(&rx_packets_mutex);
460     rx_TSFPQMaxProcs++;
461     RX_TS_FPQ_COMPUTE_LIMITS;
462     MUTEX_EXIT(&rx_packets_mutex);
463 #endif /* RX_ENABLE_TSFPQ */
464     return rx_ts_info;
465 }