3e6124504491c173620914c63938662b0f1b410d
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 #include <roken.h>
22 #include <afs/opr.h>
23
24 #include <assert.h>
25
26 #ifdef AFS_PTHREAD_ENV
27
28 #include "rx.h"
29 #include "rx_globals.h"
30 #include "rx_pthread.h"
31 #include "rx_clock.h"
32 #include "rx_atomic.h"
33 #include "rx_internal.h"
34 #include "rx_pthread.h"
35 #ifdef AFS_NT40_ENV
36 #include "rx_xmit_nt.h"
37 #endif
38
39 static void rxi_SetThreadNum(int threadID);
40
41 /* Set rx_pthread_event_rescheduled if event_handler should just try
42  * again instead of sleeping.
43  *
44  * Protected by event_handler_mutex
45  */
46 static int rx_pthread_event_rescheduled = 0;
47
48 static void *rx_ListenerProc(void *);
49
50 /*
51  * We supply an event handling thread for Rx's event processing.
52  * The condition variable is used to wakeup the thread whenever a new
53  * event is scheduled earlier than the previous earliest event.
54  * This thread is also responsible for keeping time.
55  */
56 static pthread_t event_handler_thread;
57 afs_kcondvar_t rx_event_handler_cond;
58 afs_kmutex_t event_handler_mutex;
59 afs_kcondvar_t rx_listener_cond;
60 afs_kmutex_t listener_mutex;
61 static int listeners_started = 0;
62 afs_kmutex_t rx_clock_mutex;
63 struct clock rxi_clockNow;
64
65 static rx_atomic_t threadHiNum;
66
67 int
68 rx_NewThreadId(void) {
69     return rx_atomic_inc_and_read(&threadHiNum);
70 }
71
72 /*
73  * Delay the current thread the specified number of seconds.
74  */
75 void
76 rxi_Delay(int sec)
77 {
78     sleep(sec);
79 }
80
81 /*
82  * Called from rx_Init()
83  */
84 void
85 rxi_InitializeThreadSupport(void)
86 {
87         /* listeners_started must only be reset if
88          * the listener thread terminates */
89         /* listeners_started = 0; */
90     clock_GetTime(&rxi_clockNow);
91 }
92
93 static void *
94 server_entry(void *argp)
95 {
96     void (*server_proc) (void *) = (void (*)(void *))argp;
97     server_proc(NULL);
98     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
99     return (void *) -1; /* reused as return value, see pthread(3) */
100 }
101
102 /*
103  * Start an Rx server process.
104  */
105 void
106 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
107 {
108     pthread_t thread;
109     pthread_attr_t tattr;
110     AFS_SIGSET_DECL;
111
112     if (pthread_attr_init(&tattr) != 0) {
113         osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
114     }
115
116     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
117         osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
118     }
119
120     /*
121      * NOTE: We are ignoring the stack size parameter, for now.
122      */
123     AFS_SIGSET_CLEAR();
124     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
125         osi_Panic("Unable to Create Rx server thread\n");
126     }
127     AFS_SIGSET_RESTORE();
128 }
129
130 /*
131  * The event handling process.
132  */
133 static void *
134 event_handler(void *argp)
135 {
136     unsigned long rx_pthread_n_event_expired = 0;
137     unsigned long rx_pthread_n_event_waits = 0;
138     long rx_pthread_n_event_woken = 0;
139     unsigned long rx_pthread_n_event_error = 0;
140     struct timespec rx_pthread_next_event_time = { 0, 0 };
141     int error;
142
143     MUTEX_ENTER(&event_handler_mutex);
144
145     for (;;) {
146         struct clock cv;
147         struct clock next;
148
149         MUTEX_EXIT(&event_handler_mutex);
150
151         next.sec = 30;          /* Time to sleep if there are no events scheduled */
152         next.usec = 0;
153         clock_GetTime(&cv);
154         rxevent_RaiseEvents(&next);
155
156         MUTEX_ENTER(&event_handler_mutex);
157         if (rx_pthread_event_rescheduled) {
158             rx_pthread_event_rescheduled = 0;
159             continue;
160         }
161
162         clock_Add(&cv, &next);
163         rx_pthread_next_event_time.tv_sec = cv.sec;
164         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
165         rx_pthread_n_event_waits++;
166         error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
167         if (error == 0) {
168             rx_pthread_n_event_woken++;
169         }
170 #ifdef AFS_NT40_ENV
171         else if (error == ETIMEDOUT) {
172             rx_pthread_n_event_expired++;
173         } else {
174             rx_pthread_n_event_error++;
175         }
176 #else
177         else if (errno == ETIMEDOUT) {
178             rx_pthread_n_event_expired++;
179         } else {
180             rx_pthread_n_event_error++;
181         }
182 #endif
183         rx_pthread_event_rescheduled = 0;
184     }
185     AFS_UNREACHED(return(NULL));
186 }
187
188
189 /*
190  * This routine will get called by the event package whenever a new,
191  * earlier than others, event is posted. */
192 void
193 rxi_ReScheduleEvents(void)
194 {
195     MUTEX_ENTER(&event_handler_mutex);
196     CV_SIGNAL(&rx_event_handler_cond);
197     rx_pthread_event_rescheduled = 1;
198     MUTEX_EXIT(&event_handler_mutex);
199 }
200
201
202 /* Loop to listen on a socket. Return setting *newcallp if this
203  * thread should become a server thread.  */
204 static void
205 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
206 {
207     unsigned int host;
208     u_short port;
209     struct rx_packet *p = (struct rx_packet *)0;
210
211     if (!(rx_enable_hot_thread && newcallp)) {
212         /* Don't do this for hot threads, since we might stop being the
213          * listener. */
214         opr_threadname_set("rx_Listener");
215     }
216
217     MUTEX_ENTER(&listener_mutex);
218     while (!listeners_started) {
219         CV_WAIT(&rx_listener_cond, &listener_mutex);
220     }
221     MUTEX_EXIT(&listener_mutex);
222
223     for (;;) {
224         /* See if a check for additional packets was issued */
225         rx_CheckPackets();
226
227         /*
228          * Grab a new packet only if necessary (otherwise re-use the old one)
229          */
230         if (p) {
231             rxi_RestoreDataBufs(p);
232         } else {
233             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
234                 /* Could this happen with multiple socket listeners? */
235                 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
236             }
237         }
238
239         if (rxi_ReadPacket(sock, p, &host, &port)) {
240             clock_NewTime();
241             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
242             if (newcallp && *newcallp) {
243                 if (p)
244                     rxi_FreePacket(p);
245                 return;
246             }
247         }
248     }
249     /* NOTREACHED */
250 }
251
252 /* This is the listener process request loop. The listener process loop
253  * becomes a server thread when rxi_ListenerProc returns, and stays
254  * server thread until rxi_ServerProc returns. */
255 static void *
256 rx_ListenerProc(void *argp)
257 {
258     int threadID;
259     osi_socket sock = (osi_socket)(intptr_t)argp;
260     struct rx_call *newcall;
261
262     while (1) {
263         newcall = NULL;
264         threadID = -1;
265         rxi_ListenerProc(sock, &threadID, &newcall);
266         /* osi_Assert(threadID != -1); */
267         /* osi_Assert(newcall != NULL); */
268         sock = OSI_NULLSOCKET;
269         rxi_SetThreadNum(threadID);
270         rxi_ServerProc(threadID, newcall, &sock);
271         /* osi_Assert(sock != OSI_NULLSOCKET); */
272     }
273     AFS_UNREACHED(return(NULL));
274 }
275
276 /* This is the server process request loop. The server process loop
277  * becomes a listener thread when rxi_ServerProc returns, and stays
278  * listener thread until rxi_ListenerProc returns. */
279 void *
280 rx_ServerProc(void * dummy)
281 {
282     osi_socket sock;
283     int threadID;
284     struct rx_call *newcall = NULL;
285
286     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
287     MUTEX_ENTER(&rx_quota_mutex);
288     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
289     /* threadID is used for making decisions in GetCall.  Get it by bumping
290      * number of threads handling incoming calls */
291     /* Unique thread ID: used for scheduling purposes *and* as index into
292      * the host hold table (fileserver).
293      * The previously used rxi_availProcs is unsuitable as it
294      * will already go up and down as packets arrive while the server
295      * threads are still initialising! The recently introduced
296      * rxi_pthread_hinum does not necessarily lead to a server
297      * thread with id 0, which is not allowed to hop through the
298      * incoming call queue.
299      * So either introduce yet another counter or flag the FCFS
300      * thread... chose the latter.
301      */
302     MUTEX_ENTER(&rx_pthread_mutex);
303     threadID = rx_NewThreadId();
304     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
305         rxi_fcfs_thread_num = threadID;
306     MUTEX_EXIT(&rx_pthread_mutex);
307     ++rxi_availProcs;
308     MUTEX_EXIT(&rx_quota_mutex);
309
310     while (1) {
311         sock = OSI_NULLSOCKET;
312         rxi_SetThreadNum(threadID);
313         rxi_ServerProc(threadID, newcall, &sock);
314         /* osi_Assert(sock != OSI_NULLSOCKET); */
315         newcall = NULL;
316         rxi_ListenerProc(sock, &threadID, &newcall);
317         /* osi_Assert(threadID != -1); */
318         /* osi_Assert(newcall != NULL); */
319     }
320     AFS_UNREACHED(return(NULL));
321 }
322
323 /*
324  * Historically used to start the listener process. We now have multiple
325  * listener processes (one for each socket); these are started by GetUdpSocket.
326  *
327  * The event handling process *is* started here (the old listener used
328  * to also handle events). The listener threads can't actually start
329  * listening until rxi_StartListener is called because most of R may not
330  * be initialized when rxi_Listen is called.
331  */
332 void
333 rxi_StartListener(void)
334 {
335     pthread_attr_t tattr;
336     AFS_SIGSET_DECL;
337
338         if (listeners_started)
339                 return;
340
341     if (pthread_attr_init(&tattr) != 0) {
342         osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
343     }
344
345     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
346         osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
347     }
348
349     AFS_SIGSET_CLEAR();
350     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
351         0) {
352         osi_Panic("Unable to create Rx event handling thread\n");
353     }
354     rx_NewThreadId();
355     AFS_SIGSET_RESTORE();
356
357     MUTEX_ENTER(&listener_mutex);
358     CV_BROADCAST(&rx_listener_cond);
359     listeners_started = 1;
360     MUTEX_EXIT(&listener_mutex);
361
362 }
363
364 /*
365  * Listen on the specified socket.
366  */
367 int
368 rxi_Listen(osi_socket sock)
369 {
370     pthread_t thread;
371     pthread_attr_t tattr;
372     AFS_SIGSET_DECL;
373
374     if (pthread_attr_init(&tattr) != 0) {
375         osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
376     }
377
378     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
379         osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
380     }
381
382     AFS_SIGSET_CLEAR();
383     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
384         osi_Panic("Unable to create socket listener thread\n");
385     }
386     rx_NewThreadId();
387     AFS_SIGSET_RESTORE();
388     return 0;
389 }
390
391
392 /*
393  * Recvmsg.
394  *
395  */
396 int
397 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
398 {
399     int ret;
400     ret = recvmsg(socket, msg_p, flags);
401
402 #ifdef AFS_RXERRQ_ENV
403     if (ret < 0) {
404         while (rxi_HandleSocketError(socket) > 0)
405             ;
406     }
407 #endif
408
409     return ret;
410 }
411
412 /*
413  * Sendmsg.
414  */
415 int
416 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
417 {
418     int err;
419     if (sendmsg(socket, msg_p, flags) >= 0) {
420         return 0;
421     }
422
423 #ifdef AFS_NT40_ENV
424     err = WSAGetLastError();
425 #else
426     err = errno;
427 #endif
428
429 #ifdef AFS_RXERRQ_ENV
430     while (rxi_HandleSocketError(socket) > 0)
431         ;
432 #else
433 # ifdef AFS_LINUX22_ENV
434     /* linux unfortunately returns ECONNREFUSED if the target port
435      * is no longer in use */
436     /* and EAGAIN if a UDP checksum is incorrect */
437     if (err == ECONNREFUSED || err == EAGAIN) {
438         return 0;
439     }
440 # endif
441     dpf(("rxi_sendmsg failed, error %d\n", errno));
442     fflush(stdout);
443 #endif /* !AFS_RXERRQ_ENV */
444
445     if (err > 0) {
446         return -err;
447     }
448     return -1;
449 }
450
451 struct rx_ts_info_t * rx_ts_info_init(void) {
452     struct rx_ts_info_t * rx_ts_info;
453     rx_ts_info = calloc(1, sizeof(rx_ts_info_t));
454     osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
455 #ifdef RX_ENABLE_TSFPQ
456     opr_queue_Init(&rx_ts_info->_FPQ.queue);
457
458     MUTEX_ENTER(&rx_packets_mutex);
459     rx_TSFPQMaxProcs++;
460     RX_TS_FPQ_COMPUTE_LIMITS;
461     MUTEX_EXIT(&rx_packets_mutex);
462 #endif /* RX_ENABLE_TSFPQ */
463     return rx_ts_info;
464 }
465
466 int
467 rx_GetThreadNum(void) {
468     return (intptr_t)pthread_getspecific(rx_thread_id_key);
469 }
470
471 static void
472 rxi_SetThreadNum(int threadID) {
473     osi_Assert(pthread_setspecific(rx_thread_id_key,
474                                    (void *)(intptr_t)threadID) == 0);
475 }
476
477 int
478 rx_SetThreadNum(void) {
479     int threadId;
480
481     threadId = rx_NewThreadId();
482     rxi_SetThreadNum(threadId);
483     return threadId;
484 }
485
486 #endif