pmtu-and-prefetch-20080520
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID
22     ("$Header$");
23
24 #include <sys/types.h>
25 #include <errno.h>
26 #include <signal.h>
27 #include <string.h>
28 #ifndef AFS_NT40_ENV
29 # include <sys/socket.h>
30 # include <sys/file.h>
31 # include <netdb.h>
32 # include <netinet/in.h>
33 # include <net/if.h>
34 # include <sys/ioctl.h>
35 # include <sys/time.h>
36 #endif
37 #include <sys/stat.h>
38 #include <rx/rx.h>
39 #include <rx/rx_globals.h>
40 #include <assert.h>
41 #include <rx/rx_pthread.h>
42 #include <rx/rx_clock.h>
43
44 /*
45  * Number of times the event handling thread was signalled because a new
46  * event was scheduled earlier than the lastest event.
47  *
48  * Protected by event_handler_mutex
49  */
50 static long rx_pthread_n_event_wakeups;
51
52 /* Set rx_pthread_event_rescheduled if event_handler should just try
53  * again instead of sleeping.
54  *
55  * Protected by event_handler_mutex
56  */
57 static int rx_pthread_event_rescheduled = 0;
58
59 static void *rx_ListenerProc(void *);
60
61 /*
62  * We supply an event handling thread for Rx's event processing.
63  * The condition variable is used to wakeup the thread whenever a new
64  * event is scheduled earlier than the previous earliest event.
65  * This thread is also responsible for keeping time.
66  */
67 static pthread_t event_handler_thread;
68 pthread_cond_t rx_event_handler_cond;
69 pthread_mutex_t event_handler_mutex;
70 pthread_cond_t rx_listener_cond;
71 pthread_mutex_t listener_mutex;
72 static int listeners_started = 0;
73 pthread_mutex_t rx_clock_mutex;
74 struct clock rxi_clockNow;
75
76 /*
77  * Delay the current thread the specified number of seconds.
78  */
79 void
80 rxi_Delay(int sec)
81 {
82     sleep(sec);
83 }
84
85 /*
86  * Called from rx_Init()
87  */
88 void
89 rxi_InitializeThreadSupport(void)
90 {
91         /* listeners_started must only be reset if
92          * the listener thread terminates */
93         /* listeners_started = 0; */
94     clock_GetTime(&rxi_clockNow);
95 }
96
97 static void *
98 server_entry(void *argp)
99 {
100     void (*server_proc) () = (void (*)())argp;
101     server_proc();
102     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
103     exit(1);
104     return (void *)0;
105 }
106
107 /*
108  * Start an Rx server process.
109  */
110 void
111 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
112 {
113     pthread_t thread;
114     pthread_attr_t tattr;
115     AFS_SIGSET_DECL;
116
117     if (pthread_attr_init(&tattr) != 0) {
118         dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
119         exit(1);
120     }
121
122     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
123         dpf
124             (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
125         exit(1);
126     }
127
128     /*
129      * NOTE: We are ignoring the stack size parameter, for now.
130      */
131     AFS_SIGSET_CLEAR();
132     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
133         dpf(("Unable to Create Rx server thread\n"));
134         exit(1);
135     }
136     AFS_SIGSET_RESTORE();
137 }
138
139 /*
140  * The event handling process.
141  */
142 static void *
143 event_handler(void *argp)
144 {
145     struct clock rx_pthread_last_event_wait_time = { 0, 0 };
146     unsigned long rx_pthread_n_event_expired = 0;
147     unsigned long rx_pthread_n_event_waits = 0;
148     long rx_pthread_n_event_woken = 0;
149     unsigned long rx_pthread_n_event_error = 0;
150     struct timespec rx_pthread_next_event_time = { 0, 0 };
151     int error;
152
153     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
154
155     for (;;) {
156         struct clock cv;
157         struct clock next;
158
159         assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
160
161         next.sec = 30;          /* Time to sleep if there are no events scheduled */
162         next.usec = 0;
163         clock_GetTime(&cv);
164         rxevent_RaiseEvents(&next);
165
166         assert(pthread_mutex_lock(&event_handler_mutex) == 0);
167         if (rx_pthread_event_rescheduled) {
168             rx_pthread_event_rescheduled = 0;
169             continue;
170         }
171
172         clock_Add(&cv, &next);
173         rx_pthread_next_event_time.tv_sec = cv.sec;
174         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
175         rx_pthread_n_event_waits++;
176         error = pthread_cond_timedwait
177             (&rx_event_handler_cond, &event_handler_mutex,
178              &rx_pthread_next_event_time);
179         if (error == 0) {
180             rx_pthread_n_event_woken++;
181         } 
182 #ifdef AFS_NT40_ENV        
183         else if (error == ETIMEDOUT) {
184             rx_pthread_n_event_expired++;
185         } else {
186             rx_pthread_n_event_error++;
187         }
188 #else
189         else if (errno == ETIMEDOUT) {
190             rx_pthread_n_event_expired++;
191         } else {
192             rx_pthread_n_event_error++;
193         }
194 #endif
195         rx_pthread_event_rescheduled = 0;
196     }
197 }
198
199
200 /*
201  * This routine will get called by the event package whenever a new,
202  * earlier than others, event is posted. */
203 void
204 rxi_ReScheduleEvents(void)
205 {
206     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
207     pthread_cond_signal(&rx_event_handler_cond);
208     rx_pthread_event_rescheduled = 1;
209     assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
210 }
211
212
213 /* Loop to listen on a socket. Return setting *newcallp if this
214  * thread should become a server thread.  */
215 static void
216 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
217 {
218     unsigned int host;
219     u_short port;
220     register struct rx_packet *p = (struct rx_packet *)0;
221
222     assert(pthread_mutex_lock(&listener_mutex) == 0);
223     while (!listeners_started) {
224         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
225     }
226     assert(pthread_mutex_unlock(&listener_mutex) == 0);
227
228     for (;;) {
229         /*
230          * Grab a new packet only if necessary (otherwise re-use the old one)
231          */
232         if (p) {
233             rxi_RestoreDataBufs(p);
234         } else {
235             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
236                 /* Could this happen with multiple socket listeners? */
237                 dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
238                 exit(1);
239             }
240         }
241
242         if (rxi_ReadPacket(sock, p, &host, &port)) {
243             clock_NewTime();
244             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
245             if (newcallp && *newcallp) {
246                 if (p)
247                     rxi_FreePacket(p);
248                 return;
249             }
250         }
251     }
252     /* NOTREACHED */
253 }
254
255 /* This is the listener process request loop. The listener process loop
256  * becomes a server thread when rxi_ListenerProc returns, and stays
257  * server thread until rxi_ServerProc returns. */
258 static void *
259 rx_ListenerProc(void *argp)
260 {
261     int threadID;
262     osi_socket sock = (osi_socket)argp;
263     struct rx_call *newcall;
264
265     while (1) {
266         newcall = NULL;
267         threadID = -1;
268         rxi_ListenerProc(sock, &threadID, &newcall);
269         /* assert(threadID != -1); */
270         /* assert(newcall != NULL); */
271         sock = OSI_NULLSOCKET;
272         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
273         rxi_ServerProc(threadID, newcall, &sock);
274         /* assert(sock != OSI_NULLSOCKET); */
275     }
276     /* not reached */
277 }
278
279 /* This is the server process request loop. The server process loop
280  * becomes a listener thread when rxi_ServerProc returns, and stays
281  * listener thread until rxi_ListenerProc returns. */
282 void *
283 rx_ServerProc(void * dummy)
284 {
285     osi_socket sock;
286     int threadID;
287     struct rx_call *newcall = NULL;
288
289     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
290     MUTEX_ENTER(&rx_stats_mutex);
291     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
292     /* threadID is used for making decisions in GetCall.  Get it by bumping
293      * number of threads handling incoming calls */
294     /* Unique thread ID: used for scheduling purposes *and* as index into
295      * the host hold table (fileserver). 
296      * The previously used rxi_availProcs is unsuitable as it
297      * will already go up and down as packets arrive while the server
298      * threads are still initialising! The recently introduced
299      * rxi_pthread_hinum does not necessarily lead to a server
300      * thread with id 0, which is not allowed to hop through the
301      * incoming call queue.
302      * So either introduce yet another counter or flag the FCFS
303      * thread... chose the latter.
304      */
305     threadID = ++rxi_pthread_hinum;
306     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
307         rxi_fcfs_thread_num = threadID;
308     ++rxi_availProcs;
309     MUTEX_EXIT(&rx_stats_mutex);
310
311     while (1) {
312         sock = OSI_NULLSOCKET;
313         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
314         rxi_ServerProc(threadID, newcall, &sock);
315         /* assert(sock != OSI_NULLSOCKET); */
316         newcall = NULL;
317         rxi_ListenerProc(sock, &threadID, &newcall);
318         /* assert(threadID != -1); */
319         /* assert(newcall != NULL); */
320     }
321     /* not reached */
322 }
323
324 /*
325  * Historically used to start the listener process. We now have multiple
326  * listener processes (one for each socket); these are started by GetUdpSocket.
327  *
328  * The event handling process *is* started here (the old listener used
329  * to also handle events). The listener threads can't actually start 
330  * listening until rxi_StartListener is called because most of R may not
331  * be initialized when rxi_Listen is called.
332  */
333 void
334 rxi_StartListener(void)
335 {
336     pthread_attr_t tattr;
337     AFS_SIGSET_DECL;
338
339         if (listeners_started)
340                 return;
341
342     if (pthread_attr_init(&tattr) != 0) {
343         dpf
344             (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
345         exit(1);
346     }
347
348     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
349         dpf
350             (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
351         exit(1);
352     }
353
354     AFS_SIGSET_CLEAR();
355     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
356         0) {
357         dpf(("Unable to create Rx event handling thread\n"));
358         exit(1);
359     }
360     MUTEX_ENTER(&rx_stats_mutex);
361     ++rxi_pthread_hinum;
362     MUTEX_EXIT(&rx_stats_mutex);
363     AFS_SIGSET_RESTORE();
364
365     assert(pthread_mutex_lock(&listener_mutex) == 0);
366     assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
367     listeners_started = 1;
368     assert(pthread_mutex_unlock(&listener_mutex) == 0);
369
370 }
371
372 /*
373  * Listen on the specified socket.
374  */
375 int
376 rxi_Listen(osi_socket sock)
377 {
378     pthread_t thread;
379     pthread_attr_t tattr;
380     AFS_SIGSET_DECL;
381
382     if (pthread_attr_init(&tattr) != 0) {
383         dpf
384             (("Unable to create socket listener thread (pthread_attr_init)\n"));
385         exit(1);
386     }
387
388     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
389         dpf
390             (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
391         exit(1);
392     }
393
394     AFS_SIGSET_CLEAR();
395     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
396         dpf(("Unable to create socket listener thread\n"));
397         exit(1);
398     }
399     MUTEX_ENTER(&rx_stats_mutex);
400     ++rxi_pthread_hinum;
401     MUTEX_EXIT(&rx_stats_mutex);
402     AFS_SIGSET_RESTORE();
403     return 0;
404 }
405
406
407 /*
408  * Recvmsg.
409  *
410  */
411 int
412 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
413 {
414     int ret;
415 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
416     while((rxi_HandleSocketError(socket)) > 0)
417       ;
418 #endif
419     ret = recvmsg(socket, msg_p, flags);
420     return ret;
421 }
422
423 /*
424  * Sendmsg.
425  */
426 int
427 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
428 {
429     int ret;
430     ret = sendmsg(socket, msg_p, flags);
431 #ifdef AFS_LINUX22_ENV
432     /* linux unfortunately returns ECONNREFUSED if the target port
433      * is no longer in use */
434     /* and EAGAIN if a UDP checksum is incorrect */
435     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
436 #else
437     if (ret == -1) {
438 #endif
439         dpf(("rxi_sendmsg failed, error %d\n", errno));
440         fflush(stdout);
441         return -1;
442     }
443     return 0;
444 }
445
446 struct rx_ts_info_t * rx_ts_info_init() {
447     register struct rx_ts_info_t * rx_ts_info;
448     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
449     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
450     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
451 #ifdef RX_ENABLE_TSFPQ
452     queue_Init(&rx_ts_info->_FPQ);
453
454     MUTEX_ENTER(&rx_stats_mutex);
455     rx_TSFPQMaxProcs++;
456     RX_TS_FPQ_COMPUTE_LIMITS;
457     MUTEX_EXIT(&rx_stats_mutex);
458 #endif /* RX_ENABLE_TSFPQ */
459     return rx_ts_info;
460 }