taking-rx-back-into-the-20th-century-20061228
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID
22     ("$Header$");
23
24 #include <sys/types.h>
25 #include <errno.h>
26 #include <signal.h>
27 #ifndef AFS_NT40_ENV
28 # include <sys/socket.h>
29 # include <sys/file.h>
30 # include <netdb.h>
31 # include <netinet/in.h>
32 # include <net/if.h>
33 # include <sys/ioctl.h>
34 # include <sys/time.h>
35 #endif
36 #include <sys/stat.h>
37 #include <rx/rx.h>
38 #include <rx/rx_globals.h>
39 #include <assert.h>
40 #include <rx/rx_pthread.h>
41 #include <rx/rx_clock.h>
42
43 /*
44  * Number of times the event handling thread was signalled because a new
45  * event was scheduled earlier than the lastest event.
46  *
47  * Protected by event_handler_mutex
48  */
49 static long rx_pthread_n_event_wakeups;
50
51 /* Set rx_pthread_event_rescheduled if event_handler should just try
52  * again instead of sleeping.
53  *
54  * Protected by event_handler_mutex
55  */
56 static int rx_pthread_event_rescheduled = 0;
57
58 static void *rx_ListenerProc(void *);
59
60 /*
61  * We supply an event handling thread for Rx's event processing.
62  * The condition variable is used to wakeup the thread whenever a new
63  * event is scheduled earlier than the previous earliest event.
64  * This thread is also responsible for keeping time.
65  */
66 static pthread_t event_handler_thread;
67 pthread_cond_t rx_event_handler_cond;
68 pthread_mutex_t event_handler_mutex;
69 pthread_cond_t rx_listener_cond;
70 pthread_mutex_t listener_mutex;
71 static int listeners_started = 0;
72 pthread_mutex_t rx_clock_mutex;
73 struct clock rxi_clockNow;
74
75 /*
76  * Delay the current thread the specified number of seconds.
77  */
78 void
79 rxi_Delay(int sec)
80 {
81     sleep(sec);
82 }
83
84 /*
85  * Called from rx_Init()
86  */
87 void
88 rxi_InitializeThreadSupport(void)
89 {
90         /* listeners_started must only be reset if
91          * the listener thread terminates */
92         /* listeners_started = 0; */
93     clock_GetTime(&rxi_clockNow);
94 }
95
96 static void *
97 server_entry(void *argp)
98 {
99     void (*server_proc) () = (void (*)())argp;
100     server_proc();
101     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
102     exit(1);
103     return (void *)0;
104 }
105
106 /*
107  * Start an Rx server process.
108  */
109 void
110 rxi_StartServerProc(void (*proc) (void), int stacksize)
111 {
112     pthread_t thread;
113     pthread_attr_t tattr;
114     AFS_SIGSET_DECL;
115
116     if (pthread_attr_init(&tattr) != 0) {
117         dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
118         exit(1);
119     }
120
121     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
122         dpf
123             (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
124         exit(1);
125     }
126
127     /*
128      * NOTE: We are ignoring the stack size parameter, for now.
129      */
130     AFS_SIGSET_CLEAR();
131     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
132         dpf(("Unable to Create Rx server thread\n"));
133         exit(1);
134     }
135     AFS_SIGSET_RESTORE();
136 }
137
138 /*
139  * The event handling process.
140  */
141 static void *
142 event_handler(void *argp)
143 {
144     struct clock rx_pthread_last_event_wait_time = { 0, 0 };
145     unsigned long rx_pthread_n_event_expired = 0;
146     unsigned long rx_pthread_n_event_waits = 0;
147     long rx_pthread_n_event_woken = 0;
148     struct timespec rx_pthread_next_event_time = { 0, 0 };
149
150     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
151
152     for (;;) {
153         struct clock cv;
154         struct clock next;
155
156         assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
157
158         next.sec = 30;          /* Time to sleep if there are no events scheduled */
159         next.usec = 0;
160         clock_GetTime(&cv);
161         rxevent_RaiseEvents(&next);
162
163         assert(pthread_mutex_lock(&event_handler_mutex) == 0);
164         if (rx_pthread_event_rescheduled) {
165             rx_pthread_event_rescheduled = 0;
166             continue;
167         }
168
169         clock_Add(&cv, &next);
170         rx_pthread_next_event_time.tv_sec = cv.sec;
171         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
172         rx_pthread_n_event_waits++;
173         if (pthread_cond_timedwait
174             (&rx_event_handler_cond, &event_handler_mutex,
175              &rx_pthread_next_event_time) == -1) {
176 #ifdef notdef
177             assert(errno == EAGAIN);
178 #endif
179             rx_pthread_n_event_expired++;
180         } else {
181             rx_pthread_n_event_woken++;
182         }
183         rx_pthread_event_rescheduled = 0;
184     }
185 }
186
187
188 /*
189  * This routine will get called by the event package whenever a new,
190  * earlier than others, event is posted. */
191 void
192 rxi_ReScheduleEvents(void)
193 {
194     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
195     pthread_cond_signal(&rx_event_handler_cond);
196     rx_pthread_event_rescheduled = 1;
197     assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
198 }
199
200
201 /* Loop to listen on a socket. Return setting *newcallp if this
202  * thread should become a server thread.  */
203 static void
204 rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
205 {
206     unsigned int host;
207     u_short port;
208     register struct rx_packet *p = (struct rx_packet *)0;
209
210     assert(pthread_mutex_lock(&listener_mutex) == 0);
211     while (!listeners_started) {
212         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
213     }
214     assert(pthread_mutex_unlock(&listener_mutex) == 0);
215
216     for (;;) {
217         /*
218          * Grab a new packet only if necessary (otherwise re-use the old one)
219          */
220         if (p) {
221             rxi_RestoreDataBufs(p);
222         } else {
223             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
224                 /* Could this happen with multiple socket listeners? */
225                 dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
226                 exit(1);
227             }
228         }
229
230         if (rxi_ReadPacket(sock, p, &host, &port)) {
231             clock_NewTime();
232             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
233             if (newcallp && *newcallp) {
234                 if (p)
235                     rxi_FreePacket(p);
236                 return;
237             }
238         }
239     }
240     /* NOTREACHED */
241 }
242
243 /* This is the listener process request loop. The listener process loop
244  * becomes a server thread when rxi_ListenerProc returns, and stays
245  * server thread until rxi_ServerProc returns. */
246 static void *
247 rx_ListenerProc(void *argp)
248 {
249     int threadID;
250     int sock = (int)argp;
251     struct rx_call *newcall;
252
253     while (1) {
254         newcall = NULL;
255         threadID = -1;
256         rxi_ListenerProc(sock, &threadID, &newcall);
257         /* assert(threadID != -1); */
258         /* assert(newcall != NULL); */
259         sock = OSI_NULLSOCKET;
260         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
261         rxi_ServerProc(threadID, newcall, &sock);
262         /* assert(sock != OSI_NULLSOCKET); */
263     }
264     /* not reached */
265 }
266
267 /* This is the server process request loop. The server process loop
268  * becomes a listener thread when rxi_ServerProc returns, and stays
269  * listener thread until rxi_ListenerProc returns. */
270 void
271 rx_ServerProc(void)
272 {
273     int sock;
274     int threadID;
275     struct rx_call *newcall = NULL;
276
277     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
278     MUTEX_ENTER(&rx_stats_mutex);
279     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
280     /* threadID is used for making decisions in GetCall.  Get it by bumping
281      * number of threads handling incoming calls */
282     /* Unique thread ID: used for scheduling purposes *and* as index into
283      * the host hold table (fileserver). 
284      * The previously used rxi_availProcs is unsuitable as it
285      * will already go up and down as packets arrive while the server
286      * threads are still initialising! The recently introduced
287      * rxi_pthread_hinum does not necessarily lead to a server
288      * thread with id 0, which is not allowed to hop through the
289      * incoming call queue.
290      * So either introduce yet another counter or flag the FCFS
291      * thread... chose the latter.
292      */
293     threadID = ++rxi_pthread_hinum;
294     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
295         rxi_fcfs_thread_num = threadID;
296     ++rxi_availProcs;
297     MUTEX_EXIT(&rx_stats_mutex);
298
299     while (1) {
300         sock = OSI_NULLSOCKET;
301         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
302         rxi_ServerProc(threadID, newcall, &sock);
303         /* assert(sock != OSI_NULLSOCKET); */
304         newcall = NULL;
305         rxi_ListenerProc(sock, &threadID, &newcall);
306         /* assert(threadID != -1); */
307         /* assert(newcall != NULL); */
308     }
309     /* not reached */
310 }
311
312 /*
313  * Historically used to start the listener process. We now have multiple
314  * listener processes (one for each socket); these are started by GetUdpSocket.
315  *
316  * The event handling process *is* started here (the old listener used
317  * to also handle events). The listener threads can't actually start 
318  * listening until rxi_StartListener is called because most of R may not
319  * be initialized when rxi_Listen is called.
320  */
321 void
322 rxi_StartListener(void)
323 {
324     pthread_attr_t tattr;
325     AFS_SIGSET_DECL;
326
327         if (listeners_started)
328                 return;
329
330     if (pthread_attr_init(&tattr) != 0) {
331         dpf
332             (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
333         exit(1);
334     }
335
336     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
337         dpf
338             (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
339         exit(1);
340     }
341
342     AFS_SIGSET_CLEAR();
343     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
344         0) {
345         dpf(("Unable to create Rx event handling thread\n"));
346         exit(1);
347     }
348     MUTEX_ENTER(&rx_stats_mutex);
349     ++rxi_pthread_hinum;
350     MUTEX_EXIT(&rx_stats_mutex);
351     AFS_SIGSET_RESTORE();
352
353     assert(pthread_mutex_lock(&listener_mutex) == 0);
354     assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
355     listeners_started = 1;
356     assert(pthread_mutex_unlock(&listener_mutex) == 0);
357
358 }
359
360 /*
361  * Listen on the specified socket.
362  */
363 int
364 rxi_Listen(osi_socket sock)
365 {
366     pthread_t thread;
367     pthread_attr_t tattr;
368     AFS_SIGSET_DECL;
369
370     if (pthread_attr_init(&tattr) != 0) {
371         dpf
372             (("Unable to create socket listener thread (pthread_attr_init)\n"));
373         exit(1);
374     }
375
376     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
377         dpf
378             (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
379         exit(1);
380     }
381
382     AFS_SIGSET_CLEAR();
383     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
384         dpf(("Unable to create socket listener thread\n"));
385         exit(1);
386     }
387     MUTEX_ENTER(&rx_stats_mutex);
388     ++rxi_pthread_hinum;
389     MUTEX_EXIT(&rx_stats_mutex);
390     AFS_SIGSET_RESTORE();
391     return 0;
392 }
393
394
395 /*
396  * Recvmsg.
397  *
398  */
399 int
400 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
401 {
402     int ret;
403     ret = recvmsg(socket, msg_p, flags);
404     return ret;
405 }
406
407 /*
408  * Sendmsg.
409  */
410 int
411 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
412 {
413     int ret;
414     ret = sendmsg(socket, msg_p, flags);
415 #ifdef AFS_LINUX22_ENV
416     /* linux unfortunately returns ECONNREFUSED if the target port
417      * is no longer in use */
418     /* and EAGAIN if a UDP checksum is incorrect */
419     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
420 #else
421     if (ret == -1) {
422 #endif
423         dpf(("rxi_sendmsg failed, error %d\n", errno));
424         fflush(stdout);
425         return -1;
426     }
427     return 0;
428 }
429
430 struct rx_ts_info_t * rx_ts_info_init() {
431     register struct rx_ts_info_t * rx_ts_info;
432     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
433     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
434     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
435 #ifdef RX_ENABLE_TSFPQ
436     queue_Init(&rx_ts_info->_FPQ);
437
438     MUTEX_ENTER(&rx_stats_mutex);
439     rx_TSFPQMaxProcs++;
440     RX_TS_FPQ_COMPUTE_LIMITS;
441     MUTEX_EXIT(&rx_stats_mutex);
442 #endif /* RX_ENABLE_TSFPQ */
443     return rx_ts_info;
444 }