bringing-rx-into-21st-century-20060504
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID
22     ("$Header$");
23
24 #include <sys/types.h>
25 #include <errno.h>
26 #include <signal.h>
27 #ifndef AFS_NT40_ENV
28 # include <sys/socket.h>
29 # include <sys/file.h>
30 # include <netdb.h>
31 # include <netinet/in.h>
32 # include <net/if.h>
33 # include <sys/ioctl.h>
34 # include <sys/time.h>
35 #endif
36 #include <sys/stat.h>
37 #include <rx/rx.h>
38 #include <rx/rx_globals.h>
39 #include <assert.h>
40 #include <rx/rx_pthread.h>
41 #include <rx/rx_clock.h>
42
43 /*
44  * Number of times the event handling thread was signalled because a new
45  * event was scheduled earlier than the lastest event.
46  *
47  * Protected by event_handler_mutex
48  */
49 static long rx_pthread_n_event_wakeups;
50
51 /* Set rx_pthread_event_rescheduled if event_handler should just try
52  * again instead of sleeping.
53  *
54  * Protected by event_handler_mutex
55  */
56 static int rx_pthread_event_rescheduled = 0;
57
58 static void *rx_ListenerProc(void *);
59
60 /*
61  * We supply an event handling thread for Rx's event processing.
62  * The condition variable is used to wakeup the thread whenever a new
63  * event is scheduled earlier than the previous earliest event.
64  * This thread is also responsible for keeping time.
65  */
66 static pthread_t event_handler_thread;
67 pthread_cond_t rx_event_handler_cond;
68 pthread_mutex_t event_handler_mutex;
69 pthread_cond_t rx_listener_cond;
70 pthread_mutex_t listener_mutex;
71 static int listeners_started = 0;
72 pthread_mutex_t rx_clock_mutex;
73 struct clock rxi_clockNow;
74
75 /*
76  * Delay the current thread the specified number of seconds.
77  */
78 void
79 rxi_Delay(int sec)
80 {
81     sleep(sec);
82 }
83
84 /*
85  * Called from rx_Init()
86  */
87 void
88 rxi_InitializeThreadSupport(void)
89 {
90     listeners_started = 0;
91     clock_GetTime(&rxi_clockNow);
92 }
93
94 static void *
95 server_entry(void *argp)
96 {
97     void (*server_proc) () = (void (*)())argp;
98     server_proc();
99     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
100     exit(1);
101     return (void *)0;
102 }
103
104 /*
105  * Start an Rx server process.
106  */
107 void
108 rxi_StartServerProc(void (*proc) (void), int stacksize)
109 {
110     pthread_t thread;
111     pthread_attr_t tattr;
112     AFS_SIGSET_DECL;
113
114     if (pthread_attr_init(&tattr) != 0) {
115         dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
116         exit(1);
117     }
118
119     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
120         dpf
121             (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
122         exit(1);
123     }
124
125     /*
126      * NOTE: We are ignoring the stack size parameter, for now.
127      */
128     AFS_SIGSET_CLEAR();
129     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
130         dpf(("Unable to Create Rx server thread\n"));
131         exit(1);
132     }
133     AFS_SIGSET_RESTORE();
134 }
135
136 /*
137  * The event handling process.
138  */
139 static void *
140 event_handler(void *argp)
141 {
142     struct clock rx_pthread_last_event_wait_time = { 0, 0 };
143     unsigned long rx_pthread_n_event_expired = 0;
144     unsigned long rx_pthread_n_event_waits = 0;
145     long rx_pthread_n_event_woken = 0;
146     struct timespec rx_pthread_next_event_time = { 0, 0 };
147
148     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
149
150     for (;;) {
151         struct clock cv;
152         struct clock next;
153
154         assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
155
156         next.sec = 30;          /* Time to sleep if there are no events scheduled */
157         next.usec = 0;
158         clock_GetTime(&cv);
159         rxevent_RaiseEvents(&next);
160
161         assert(pthread_mutex_lock(&event_handler_mutex) == 0);
162         if (rx_pthread_event_rescheduled) {
163             rx_pthread_event_rescheduled = 0;
164             continue;
165         }
166
167         clock_Add(&cv, &next);
168         rx_pthread_next_event_time.tv_sec = cv.sec;
169         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
170         rx_pthread_n_event_waits++;
171         if (pthread_cond_timedwait
172             (&rx_event_handler_cond, &event_handler_mutex,
173              &rx_pthread_next_event_time) == -1) {
174 #ifdef notdef
175             assert(errno == EAGAIN);
176 #endif
177             rx_pthread_n_event_expired++;
178         } else {
179             rx_pthread_n_event_woken++;
180         }
181         rx_pthread_event_rescheduled = 0;
182     }
183 }
184
185
186 /*
187  * This routine will get called by the event package whenever a new,
188  * earlier than others, event is posted. */
189 void
190 rxi_ReScheduleEvents(void)
191 {
192     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
193     pthread_cond_signal(&rx_event_handler_cond);
194     rx_pthread_event_rescheduled = 1;
195     assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
196 }
197
198
199 /* Loop to listen on a socket. Return setting *newcallp if this
200  * thread should become a server thread.  */
201 static void
202 rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
203 {
204     struct sockaddr_storage saddr;
205     int slen;
206     register struct rx_packet *p = (struct rx_packet *)0;
207
208     assert(pthread_mutex_lock(&listener_mutex) == 0);
209     while (!listeners_started) {
210         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
211     }
212     assert(pthread_mutex_unlock(&listener_mutex) == 0);
213
214     for (;;) {
215         /*
216          * Grab a new packet only if necessary (otherwise re-use the old one)
217          */
218         if (p) {
219             rxi_RestoreDataBufs(p);
220         } else {
221             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
222                 /* Could this happen with multiple socket listeners? */
223                 dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
224                 exit(1);
225             }
226         }
227
228         slen = sizeof(saddr);
229         if (rxi_ReadPacket(sock, p, &saddr, &slen)) {
230             clock_NewTime();
231             p = rxi_ReceivePacket(p, sock, &saddr, slen, tnop, newcallp);
232             if (newcallp && *newcallp) {
233                 if (p)
234                     rxi_FreePacket(p);
235                 return;
236             }
237         }
238     }
239     /* NOTREACHED */
240 }
241
242 /* This is the listener process request loop. The listener process loop
243  * becomes a server thread when rxi_ListenerProc returns, and stays
244  * server thread until rxi_ServerProc returns. */
245 static void *
246 rx_ListenerProc(void *argp)
247 {
248     int threadID;
249     int sock = (int)argp;
250     struct rx_call *newcall;
251
252     while (1) {
253         newcall = NULL;
254         threadID = -1;
255         rxi_ListenerProc(sock, &threadID, &newcall);
256         /* assert(threadID != -1); */
257         /* assert(newcall != NULL); */
258         sock = OSI_NULLSOCKET;
259         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
260         rxi_ServerProc(threadID, newcall, &sock);
261         /* assert(sock != OSI_NULLSOCKET); */
262     }
263     /* not reached */
264 }
265
266 /* This is the server process request loop. The server process loop
267  * becomes a listener thread when rxi_ServerProc returns, and stays
268  * listener thread until rxi_ListenerProc returns. */
269 void
270 rx_ServerProc(void)
271 {
272     int sock;
273     int threadID;
274     struct rx_call *newcall = NULL;
275
276     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
277     MUTEX_ENTER(&rx_stats_mutex);
278     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
279     /* threadID is used for making decisions in GetCall.  Get it by bumping
280      * number of threads handling incoming calls */
281     /* Unique thread ID: used for scheduling purposes *and* as index into
282      * the host hold table (fileserver). 
283      * The previously used rxi_availProcs is unsuitable as it
284      * will already go up and down as packets arrive while the server
285      * threads are still initialising! The recently introduced
286      * rxi_pthread_hinum does not necessarily lead to a server
287      * thread with id 0, which is not allowed to hop through the
288      * incoming call queue.
289      * So either introduce yet another counter or flag the FCFS
290      * thread... chose the latter.
291      */
292     threadID = ++rxi_pthread_hinum;
293     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
294         rxi_fcfs_thread_num = threadID;
295     ++rxi_availProcs;
296     MUTEX_EXIT(&rx_stats_mutex);
297
298     while (1) {
299         sock = OSI_NULLSOCKET;
300         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
301         rxi_ServerProc(threadID, newcall, &sock);
302         /* assert(sock != OSI_NULLSOCKET); */
303         newcall = NULL;
304         rxi_ListenerProc(sock, &threadID, &newcall);
305         /* assert(threadID != -1); */
306         /* assert(newcall != NULL); */
307     }
308     /* not reached */
309 }
310
311 /*
312  * Historically used to start the listener process. We now have multiple
313  * listener processes (one for each socket); these are started by GetUdpSocket.
314  *
315  * The event handling process *is* started here (the old listener used
316  * to also handle events). The listener threads can't actually start 
317  * listening until rxi_StartListener is called because most of R may not
318  * be initialized when rxi_Listen is called.
319  */
320 void
321 rxi_StartListener(void)
322 {
323     pthread_attr_t tattr;
324     AFS_SIGSET_DECL;
325
326     if (pthread_attr_init(&tattr) != 0) {
327         dpf
328             (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
329         exit(1);
330     }
331
332     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
333         dpf
334             (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
335         exit(1);
336     }
337
338     AFS_SIGSET_CLEAR();
339     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
340         0) {
341         dpf(("Unable to create Rx event handling thread\n"));
342         exit(1);
343     }
344     MUTEX_ENTER(&rx_stats_mutex);
345     ++rxi_pthread_hinum;
346     MUTEX_EXIT(&rx_stats_mutex);
347     AFS_SIGSET_RESTORE();
348
349     assert(pthread_mutex_lock(&listener_mutex) == 0);
350     assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
351     listeners_started = 1;
352     assert(pthread_mutex_unlock(&listener_mutex) == 0);
353
354 }
355
356 /*
357  * Listen on the specified socket.
358  */
359 int
360 rxi_Listen(osi_socket sock)
361 {
362     pthread_t thread;
363     pthread_attr_t tattr;
364     AFS_SIGSET_DECL;
365
366     if (pthread_attr_init(&tattr) != 0) {
367         dpf
368             (("Unable to create socket listener thread (pthread_attr_init)\n"));
369         exit(1);
370     }
371
372     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
373         dpf
374             (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
375         exit(1);
376     }
377
378     AFS_SIGSET_CLEAR();
379     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
380         dpf(("Unable to create socket listener thread\n"));
381         exit(1);
382     }
383     MUTEX_ENTER(&rx_stats_mutex);
384     ++rxi_pthread_hinum;
385     MUTEX_EXIT(&rx_stats_mutex);
386     AFS_SIGSET_RESTORE();
387     return 0;
388 }
389
390
391 /*
392  * Recvmsg.
393  *
394  */
395 int
396 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
397 {
398     int ret;
399     ret = recvmsg(socket, msg_p, flags);
400     return ret;
401 }
402
403 /*
404  * Sendmsg.
405  */
406 int
407 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
408 {
409     int ret;
410     ret = sendmsg(socket, msg_p, flags);
411 #ifdef AFS_LINUX22_ENV
412     /* linux unfortunately returns ECONNREFUSED if the target port
413      * is no longer in use */
414     /* and EAGAIN if a UDP checksum is incorrect */
415     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
416 #else
417     if (ret == -1) {
418 #endif
419         dpf(("rxi_sendmsg failed, error %d\n", errno));
420         fflush(stdout);
421         return -1;
422     }
423     return 0;
424 }
425
426 struct rx_ts_info_t * rx_ts_info_init() {
427     register struct rx_ts_info_t * rx_ts_info;
428     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
429     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
430     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
431 #ifdef RX_ENABLE_TSFPQ
432     queue_Init(&rx_ts_info->_FPQ);
433
434     MUTEX_ENTER(&rx_stats_mutex);
435     rx_TSFPQMaxProcs++;
436     RX_TS_FPQ_COMPUTE_LIMITS;
437     MUTEX_EXIT(&rx_stats_mutex);
438 #endif /* RX_ENABLE_TSFPQ */
439     return rx_ts_info;
440 }