winnt-include-sanity-20030314
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID("$Header$");
22
23 #include <sys/types.h>
24 #include <errno.h>
25 #include <signal.h>
26 #ifndef AFS_NT40_ENV
27 # include <sys/socket.h>
28 # include <sys/file.h>
29 # include <netdb.h>
30 # include <netinet/in.h>
31 # include <net/if.h>
32 # include <sys/ioctl.h>
33 # include <sys/time.h>
34 #endif
35 #include <sys/stat.h>
36 #include <rx/rx.h>
37 #include <rx/rx_globals.h>
38 #include <assert.h>
39 #include <rx/rx_pthread.h>
40
41 /*
42  * Number of times the event handling thread was signalled because a new
43  * event was scheduled earlier than the lastest event.
44  *
45  * Protected by event_handler_mutex
46  */
47 static long rx_pthread_n_event_wakeups;
48
49 /* Set rx_pthread_event_rescheduled if event_handler should just try
50  * again instead of sleeping.
51  *
52  * Protected by event_handler_mutex
53  */
54 static int rx_pthread_event_rescheduled = 0;
55
56 static void *rx_ListenerProc(void *);
57
58 /*
59  * We supply an event handling thread for Rx's event processing.
60  * The condition variable is used to wakeup the thread whenever a new
61  * event is scheduled earlier than the previous earliest event.
62  * This thread is also responsible for keeping time.
63  */
64 static pthread_t event_handler_thread;
65 pthread_cond_t rx_event_handler_cond;
66 pthread_mutex_t event_handler_mutex;
67 pthread_cond_t rx_listener_cond;
68 pthread_mutex_t listener_mutex;
69 static int listeners_started = 0;
70 pthread_mutex_t rx_clock_mutex;
71 struct clock rxi_clockNow;
72
73 /*
74  * Delay the current thread the specified number of seconds.
75  */
76 void rxi_Delay(int sec)
77 {
78     sleep(sec);
79 }
80
81 /*
82  * Called from rx_Init()
83  */
84 void rxi_InitializeThreadSupport(void)
85 {
86     listeners_started = 0;
87     gettimeofday((struct timeval *)&rxi_clockNow, NULL);
88 }
89
90 static void *server_entry(void * argp)
91 {
92     void (*server_proc)() = (void (*)()) argp;
93     server_proc();
94     printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
95     exit(1);
96     return (void *) 0;
97 }
98
99 /*
100  * Start an Rx server process.
101  */
102 void rxi_StartServerProc(void (*proc)(void), int stacksize)
103 {
104     pthread_t thread;
105     pthread_attr_t tattr;
106     AFS_SIGSET_DECL;
107
108     if (pthread_attr_init
109         (&tattr) != 0) {
110         printf("Unable to Create Rx server thread (pthread_attr_init)\n");
111         exit(1);
112     }
113
114     if (pthread_attr_setdetachstate
115         (&tattr,
116          PTHREAD_CREATE_DETACHED) != 0) {
117         printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
118         exit(1);
119     }
120
121     /*
122      * NOTE: We are ignoring the stack size parameter, for now.
123      */
124     AFS_SIGSET_CLEAR();
125     if (pthread_create
126         (&thread,
127          &tattr,
128          server_entry,
129          (void *) proc) != 0) {
130         printf("Unable to Create Rx server thread\n");
131         exit(1);
132     }
133     AFS_SIGSET_RESTORE();
134 }
135
136 /*
137  * The event handling process.
138  */
139 static void *event_handler(void *argp)
140 {
141     struct clock rx_pthread_last_event_wait_time = {0,0};
142     unsigned long rx_pthread_n_event_expired = 0;
143     unsigned long rx_pthread_n_event_waits = 0;
144     long rx_pthread_n_event_woken = 0;
145     struct timespec rx_pthread_next_event_time = {0,0};
146
147     assert(pthread_mutex_lock(&event_handler_mutex)==0);
148
149     for (;;) {
150         struct clock cv;
151         struct clock next;
152
153         assert(pthread_mutex_unlock(&event_handler_mutex)==0);
154     
155         next.sec = 30; /* Time to sleep if there are no events scheduled */
156         next.usec = 0;
157         gettimeofday((struct timeval *)&cv, NULL);
158         rxevent_RaiseEvents(&next);
159
160         assert(pthread_mutex_lock(&event_handler_mutex)==0);
161         if (rx_pthread_event_rescheduled) {
162             rx_pthread_event_rescheduled = 0;
163             continue;
164         }
165
166         clock_Add(&cv, &next);
167         rx_pthread_next_event_time.tv_sec = cv.sec;
168         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
169         rx_pthread_n_event_waits++;
170         if (pthread_cond_timedwait
171             (&rx_event_handler_cond,
172              &event_handler_mutex,
173              &rx_pthread_next_event_time) == -1) {
174 #ifdef notdef
175             assert(errno == EAGAIN); 
176 #endif
177             rx_pthread_n_event_expired++;
178         } else {        
179             rx_pthread_n_event_woken++;
180         }
181         rx_pthread_event_rescheduled = 0;
182     }
183 }
184
185
186 /*
187  * This routine will get called by the event package whenever a new,
188  * earlier than others, event is posted. */
189 void rxi_ReScheduleEvents(void)
190 {
191     assert(pthread_mutex_lock(&event_handler_mutex)==0);
192     pthread_cond_signal(&rx_event_handler_cond);
193     rx_pthread_event_rescheduled = 1;
194     assert(pthread_mutex_unlock(&event_handler_mutex)==0);
195 }
196
197
198 /* Loop to listen on a socket. Return setting *newcallp if this
199  * thread should become a server thread.  */
200 static void rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
201 {
202     unsigned int host;
203     u_short port;
204     register struct rx_packet *p = (struct rx_packet *)0;
205
206     assert(pthread_mutex_lock(&listener_mutex)==0);
207     while (!listeners_started) {
208         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0);
209     }
210     assert(pthread_mutex_unlock(&listener_mutex)==0);
211
212     for (;;) {
213         /*
214          * Grab a new packet only if necessary (otherwise re-use the old one)
215          */
216         if (p) {
217             rxi_RestoreDataBufs(p);
218         }
219         else {
220             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
221                 /* Could this happen with multiple socket listeners? */
222                 printf("rxi_Listener: no packets!"); /* Shouldn't happen */
223                 exit(1);
224             }
225         }
226
227         if (rxi_ReadPacket(sock, p, &host, &port)) {
228             clock_NewTime();
229             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
230             if (newcallp && *newcallp) {
231                 if (p)
232                     rxi_FreePacket(p);
233                 return;
234             }
235         }
236     }
237     /* NOTREACHED */
238 }
239
240 /* This is the listener process request loop. The listener process loop
241  * becomes a server thread when rxi_ListenerProc returns, and stays
242  * server thread until rxi_ServerProc returns. */
243 static void *rx_ListenerProc(void *argp)
244 {
245     int threadID;
246     int sock = (int) argp;
247     struct rx_call *newcall;
248
249     while(1) {
250         newcall = NULL;
251         threadID = -1;
252         rxi_ListenerProc(sock, &threadID, &newcall);
253         /* assert(threadID != -1); */
254         /* assert(newcall != NULL); */
255         sock = OSI_NULLSOCKET;
256         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
257         rxi_ServerProc(threadID, newcall, &sock);
258         /* assert(sock != OSI_NULLSOCKET); */
259     }
260     /* not reached */
261 }
262
263 /* This is the server process request loop. The server process loop
264  * becomes a listener thread when rxi_ServerProc returns, and stays
265  * listener thread until rxi_ListenerProc returns. */
266 void rx_ServerProc(void)
267 {
268     int sock;
269     int threadID;
270     struct rx_call *newcall = NULL;
271
272     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
273     MUTEX_ENTER(&rx_stats_mutex);
274     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
275     /* threadID is used for making decisions in GetCall.  Get it by bumping
276      * number of threads handling incoming calls */
277         /* Unique thread ID: used for scheduling purposes *and* as index into
278                 the host hold table (fileserver). 
279                 The previously used rxi_availProcs is unsuitable as it
280                 will already go up and down as packets arrive while the server
281                 threads are still initialising! The recently introduced
282                 rxi_pthread_hinum does not necessarily lead to a server
283                 thread with id 0, which is not allowed to hop through the
284                 incoming call queue.
285                 So either introduce yet another counter or flag the FCFS
286                 thread... chose the latter.
287         */
288         threadID = ++rxi_pthread_hinum;
289         if (rxi_fcfs_thread_num==0 && rxi_fcfs_thread_num!=threadID)
290                 rxi_fcfs_thread_num=threadID;
291         ++rxi_availProcs;
292     MUTEX_EXIT(&rx_stats_mutex);
293
294     while(1) {
295         sock = OSI_NULLSOCKET;
296         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
297         rxi_ServerProc(threadID, newcall, &sock);
298         /* assert(sock != OSI_NULLSOCKET); */
299         newcall = NULL;
300         rxi_ListenerProc(sock, &threadID, &newcall);
301         /* assert(threadID != -1); */
302         /* assert(newcall != NULL); */
303     }
304     /* not reached */
305 }
306
307 /*
308  * Historically used to start the listener process. We now have multiple
309  * listener processes (one for each socket); these are started by GetUdpSocket.
310  *
311  * The event handling process *is* started here (the old listener used
312  * to also handle events). The listener threads can't actually start 
313  * listening until rxi_StartListener is called because most of R may not
314  * be initialized when rxi_Listen is called.
315  */
316 void rxi_StartListener(void)
317 {
318     pthread_attr_t tattr;
319     AFS_SIGSET_DECL;
320
321     if (pthread_attr_init
322         (&tattr) != 0) {
323         printf("Unable to create Rx event handling thread (pthread_attr_init)\n");
324         exit(1);
325     }
326
327     if (pthread_attr_setdetachstate
328         (&tattr,
329          PTHREAD_CREATE_DETACHED) != 0) {
330         printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
331         exit(1);
332     }
333
334     AFS_SIGSET_CLEAR();
335     if (pthread_create
336         (&event_handler_thread,
337          &tattr,
338          event_handler,
339          NULL) != 0) {
340         printf("Unable to create Rx event handling thread\n");
341         exit(1);
342     }
343     MUTEX_ENTER(&rx_stats_mutex);
344     ++rxi_pthread_hinum;
345     MUTEX_EXIT(&rx_stats_mutex);
346     AFS_SIGSET_RESTORE();
347
348     assert(pthread_mutex_lock(&listener_mutex)==0);
349     assert(pthread_cond_broadcast(&rx_listener_cond)==0);
350     listeners_started = 1;
351     assert(pthread_mutex_unlock(&listener_mutex)==0);
352
353 }
354
355 /*
356  * Listen on the specified socket.
357  */
358 int rxi_Listen(osi_socket sock)
359 {
360     pthread_t thread;
361     pthread_attr_t tattr;
362     AFS_SIGSET_DECL;
363
364     if (pthread_attr_init
365         (&tattr) != 0) {
366         printf("Unable to create socket listener thread (pthread_attr_init)\n");
367         exit(1);
368     }
369
370     if (pthread_attr_setdetachstate
371         (&tattr,
372          PTHREAD_CREATE_DETACHED) != 0) {
373         printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
374         exit(1);
375     }
376
377     AFS_SIGSET_CLEAR();
378     if (pthread_create
379         (&thread,
380          &tattr,
381          rx_ListenerProc,
382          (void *) sock) != 0) {
383         printf("Unable to create socket listener thread\n");
384         exit(1);
385     }
386     MUTEX_ENTER(&rx_stats_mutex);
387     ++rxi_pthread_hinum;
388     MUTEX_EXIT(&rx_stats_mutex);
389     AFS_SIGSET_RESTORE();
390     return 0;
391 }
392
393
394 /*
395  * Recvmsg.
396  *
397  */
398 int rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
399 {
400     int ret;
401     ret = recvmsg(socket, msg_p, flags);
402     return ret;
403 }
404
405 /*
406  * Sendmsg.
407  */
408 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
409 {
410     int ret;
411     ret = sendmsg(socket, msg_p, flags);
412 #ifdef AFS_LINUX22_ENV
413     /* linux unfortunately returns ECONNREFUSED if the target port
414      * is no longer in use */
415     /* and EAGAIN if a UDP checksum is incorrect */
416     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
417 #else
418     if (ret == -1) {
419 #endif
420         printf("rxi_sendmsg failed, error %d\n", errno);
421         fflush(stdout);
422     }
423     return 0;
424 }