rx-rwlocks-no-more-20090110
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID
22     ("$Header$");
23
24 #include <sys/types.h>
25 #include <errno.h>
26 #include <signal.h>
27 #include <string.h>
28 #ifndef AFS_NT40_ENV
29 # include <sys/socket.h>
30 # include <sys/file.h>
31 # include <netdb.h>
32 # include <netinet/in.h>
33 # include <net/if.h>
34 # include <sys/ioctl.h>
35 # include <sys/time.h>
36 #endif
37 #include <sys/stat.h>
38 #include "rx_internal.h"
39 #include <rx/rx.h>
40 #include <rx/rx_globals.h>
41 #include <assert.h>
42 #include <rx/rx_pthread.h>
43 #include <rx/rx_clock.h>
44
45 /*
46  * Number of times the event handling thread was signalled because a new
47  * event was scheduled earlier than the lastest event.
48  *
49  * Protected by event_handler_mutex
50  */
51 static long rx_pthread_n_event_wakeups;
52
53 /* Set rx_pthread_event_rescheduled if event_handler should just try
54  * again instead of sleeping.
55  *
56  * Protected by event_handler_mutex
57  */
58 static int rx_pthread_event_rescheduled = 0;
59
60 static void *rx_ListenerProc(void *);
61
62 /*
63  * We supply an event handling thread for Rx's event processing.
64  * The condition variable is used to wakeup the thread whenever a new
65  * event is scheduled earlier than the previous earliest event.
66  * This thread is also responsible for keeping time.
67  */
68 static pthread_t event_handler_thread;
69 pthread_cond_t rx_event_handler_cond;
70 pthread_mutex_t event_handler_mutex;
71 pthread_cond_t rx_listener_cond;
72 pthread_mutex_t listener_mutex;
73 static int listeners_started = 0;
74 pthread_mutex_t rx_clock_mutex;
75 struct clock rxi_clockNow;
76
77 /*
78  * Delay the current thread the specified number of seconds.
79  */
80 void
81 rxi_Delay(int sec)
82 {
83     sleep(sec);
84 }
85
86 /*
87  * Called from rx_Init()
88  */
89 void
90 rxi_InitializeThreadSupport(void)
91 {
92         /* listeners_started must only be reset if
93          * the listener thread terminates */
94         /* listeners_started = 0; */
95     clock_GetTime(&rxi_clockNow);
96 }
97
98 static void *
99 server_entry(void *argp)
100 {
101     void (*server_proc) () = (void (*)())argp;
102     server_proc();
103     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
104     exit(1);
105     return (void *)0;
106 }
107
108 /*
109  * Start an Rx server process.
110  */
111 void
112 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
113 {
114     pthread_t thread;
115     pthread_attr_t tattr;
116     AFS_SIGSET_DECL;
117
118     if (pthread_attr_init(&tattr) != 0) {
119         dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
120         exit(1);
121     }
122
123     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
124         dpf
125             (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
126         exit(1);
127     }
128
129     /*
130      * NOTE: We are ignoring the stack size parameter, for now.
131      */
132     AFS_SIGSET_CLEAR();
133     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
134         dpf(("Unable to Create Rx server thread\n"));
135         exit(1);
136     }
137     AFS_SIGSET_RESTORE();
138 }
139
140 /*
141  * The event handling process.
142  */
143 static void *
144 event_handler(void *argp)
145 {
146     struct clock rx_pthread_last_event_wait_time = { 0, 0 };
147     unsigned long rx_pthread_n_event_expired = 0;
148     unsigned long rx_pthread_n_event_waits = 0;
149     long rx_pthread_n_event_woken = 0;
150     unsigned long rx_pthread_n_event_error = 0;
151     struct timespec rx_pthread_next_event_time = { 0, 0 };
152     int error;
153
154     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
155
156     for (;;) {
157         struct clock cv;
158         struct clock next;
159
160         assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
161
162         next.sec = 30;          /* Time to sleep if there are no events scheduled */
163         next.usec = 0;
164         clock_GetTime(&cv);
165         rxevent_RaiseEvents(&next);
166
167         assert(pthread_mutex_lock(&event_handler_mutex) == 0);
168         if (rx_pthread_event_rescheduled) {
169             rx_pthread_event_rescheduled = 0;
170             continue;
171         }
172
173         clock_Add(&cv, &next);
174         rx_pthread_next_event_time.tv_sec = cv.sec;
175         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
176         rx_pthread_n_event_waits++;
177         error = pthread_cond_timedwait
178             (&rx_event_handler_cond, &event_handler_mutex,
179              &rx_pthread_next_event_time);
180         if (error == 0) {
181             rx_pthread_n_event_woken++;
182         } 
183 #ifdef AFS_NT40_ENV        
184         else if (error == ETIMEDOUT) {
185             rx_pthread_n_event_expired++;
186         } else {
187             rx_pthread_n_event_error++;
188         }
189 #else
190         else if (errno == ETIMEDOUT) {
191             rx_pthread_n_event_expired++;
192         } else {
193             rx_pthread_n_event_error++;
194         }
195 #endif
196         rx_pthread_event_rescheduled = 0;
197     }
198 }
199
200
201 /*
202  * This routine will get called by the event package whenever a new,
203  * earlier than others, event is posted. */
204 void
205 rxi_ReScheduleEvents(void)
206 {
207     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
208     pthread_cond_signal(&rx_event_handler_cond);
209     rx_pthread_event_rescheduled = 1;
210     assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
211 }
212
213
214 /* Loop to listen on a socket. Return setting *newcallp if this
215  * thread should become a server thread.  */
216 static void
217 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
218 {
219     unsigned int host;
220     u_short port;
221     register struct rx_packet *p = (struct rx_packet *)0;
222
223     assert(pthread_mutex_lock(&listener_mutex) == 0);
224     while (!listeners_started) {
225         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
226     }
227     assert(pthread_mutex_unlock(&listener_mutex) == 0);
228
229     for (;;) {
230         /*
231          * Grab a new packet only if necessary (otherwise re-use the old one)
232          */
233         if (p) {
234             rxi_RestoreDataBufs(p);
235         } else {
236             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
237                 /* Could this happen with multiple socket listeners? */
238                 dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
239                 exit(1);
240             }
241         }
242
243         if (rxi_ReadPacket(sock, p, &host, &port)) {
244             clock_NewTime();
245             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
246             if (newcallp && *newcallp) {
247                 if (p)
248                     rxi_FreePacket(p);
249                 return;
250             }
251         }
252     }
253     /* NOTREACHED */
254 }
255
256 /* This is the listener process request loop. The listener process loop
257  * becomes a server thread when rxi_ListenerProc returns, and stays
258  * server thread until rxi_ServerProc returns. */
259 static void *
260 rx_ListenerProc(void *argp)
261 {
262     int threadID;
263     osi_socket sock = (osi_socket)argp;
264     struct rx_call *newcall;
265
266     while (1) {
267         newcall = NULL;
268         threadID = -1;
269         rxi_ListenerProc(sock, &threadID, &newcall);
270         /* assert(threadID != -1); */
271         /* assert(newcall != NULL); */
272         sock = OSI_NULLSOCKET;
273         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
274         rxi_ServerProc(threadID, newcall, &sock);
275         /* assert(sock != OSI_NULLSOCKET); */
276     }
277     /* not reached */
278 }
279
280 /* This is the server process request loop. The server process loop
281  * becomes a listener thread when rxi_ServerProc returns, and stays
282  * listener thread until rxi_ListenerProc returns. */
283 void *
284 rx_ServerProc(void * dummy)
285 {
286     osi_socket sock;
287     int threadID;
288     struct rx_call *newcall = NULL;
289
290     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
291     MUTEX_ENTER(&rx_stats_mutex);
292     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
293     /* threadID is used for making decisions in GetCall.  Get it by bumping
294      * number of threads handling incoming calls */
295     /* Unique thread ID: used for scheduling purposes *and* as index into
296      * the host hold table (fileserver). 
297      * The previously used rxi_availProcs is unsuitable as it
298      * will already go up and down as packets arrive while the server
299      * threads are still initialising! The recently introduced
300      * rxi_pthread_hinum does not necessarily lead to a server
301      * thread with id 0, which is not allowed to hop through the
302      * incoming call queue.
303      * So either introduce yet another counter or flag the FCFS
304      * thread... chose the latter.
305      */
306     threadID = ++rxi_pthread_hinum;
307     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
308         rxi_fcfs_thread_num = threadID;
309     ++rxi_availProcs;
310     MUTEX_EXIT(&rx_stats_mutex);
311
312     while (1) {
313         sock = OSI_NULLSOCKET;
314         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
315         rxi_ServerProc(threadID, newcall, &sock);
316         /* assert(sock != OSI_NULLSOCKET); */
317         newcall = NULL;
318         rxi_ListenerProc(sock, &threadID, &newcall);
319         /* assert(threadID != -1); */
320         /* assert(newcall != NULL); */
321     }
322     /* not reached */
323 }
324
325 /*
326  * Historically used to start the listener process. We now have multiple
327  * listener processes (one for each socket); these are started by GetUdpSocket.
328  *
329  * The event handling process *is* started here (the old listener used
330  * to also handle events). The listener threads can't actually start 
331  * listening until rxi_StartListener is called because most of R may not
332  * be initialized when rxi_Listen is called.
333  */
334 void
335 rxi_StartListener(void)
336 {
337     pthread_attr_t tattr;
338     AFS_SIGSET_DECL;
339
340         if (listeners_started)
341                 return;
342
343     if (pthread_attr_init(&tattr) != 0) {
344         dpf
345             (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
346         exit(1);
347     }
348
349     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
350         dpf
351             (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
352         exit(1);
353     }
354
355     AFS_SIGSET_CLEAR();
356     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
357         0) {
358         dpf(("Unable to create Rx event handling thread\n"));
359         exit(1);
360     }
361     MUTEX_ENTER(&rx_stats_mutex);
362     ++rxi_pthread_hinum;
363     MUTEX_EXIT(&rx_stats_mutex);
364     AFS_SIGSET_RESTORE();
365
366     assert(pthread_mutex_lock(&listener_mutex) == 0);
367     assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
368     listeners_started = 1;
369     assert(pthread_mutex_unlock(&listener_mutex) == 0);
370
371 }
372
373 /*
374  * Listen on the specified socket.
375  */
376 int
377 rxi_Listen(osi_socket sock)
378 {
379     pthread_t thread;
380     pthread_attr_t tattr;
381     AFS_SIGSET_DECL;
382
383     if (pthread_attr_init(&tattr) != 0) {
384         dpf
385             (("Unable to create socket listener thread (pthread_attr_init)\n"));
386         exit(1);
387     }
388
389     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
390         dpf
391             (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
392         exit(1);
393     }
394
395     AFS_SIGSET_CLEAR();
396     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
397         dpf(("Unable to create socket listener thread\n"));
398         exit(1);
399     }
400     MUTEX_ENTER(&rx_stats_mutex);
401     ++rxi_pthread_hinum;
402     MUTEX_EXIT(&rx_stats_mutex);
403     AFS_SIGSET_RESTORE();
404     return 0;
405 }
406
407
408 /*
409  * Recvmsg.
410  *
411  */
412 int
413 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
414 {
415     int ret;
416 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
417     while((rxi_HandleSocketError(socket)) > 0)
418       ;
419 #endif
420     ret = recvmsg(socket, msg_p, flags);
421     return ret;
422 }
423
424 /*
425  * Sendmsg.
426  */
427 int
428 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
429 {
430     int ret;
431     ret = sendmsg(socket, msg_p, flags);
432 #ifdef AFS_LINUX22_ENV
433     /* linux unfortunately returns ECONNREFUSED if the target port
434      * is no longer in use */
435     /* and EAGAIN if a UDP checksum is incorrect */
436     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
437 #else
438     if (ret == -1) {
439 #endif
440         dpf(("rxi_sendmsg failed, error %d\n", errno));
441         fflush(stdout);
442         return -1;
443     }
444     return 0;
445 }
446
447 struct rx_ts_info_t * rx_ts_info_init() {
448     register struct rx_ts_info_t * rx_ts_info;
449     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
450     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
451     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
452 #ifdef RX_ENABLE_TSFPQ
453     queue_Init(&rx_ts_info->_FPQ);
454
455     MUTEX_ENTER(&rx_stats_mutex);
456     rx_TSFPQMaxProcs++;
457     RX_TS_FPQ_COMPUTE_LIMITS;
458     MUTEX_EXIT(&rx_stats_mutex);
459 #endif /* RX_ENABLE_TSFPQ */
460     return rx_ts_info;
461 }