misc-build-cleanup-20010917
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID("$Header$");
22
23 #include <sys/types.h>
24 #include <errno.h>
25 #include <signal.h>
26 #ifndef AFS_NT40_ENV
27 # include <sys/socket.h>
28 # include <sys/file.h>
29 # include <netdb.h>
30 # include <netinet/in.h>
31 # include <net/if.h>
32 # include <sys/ioctl.h>
33 # include <sys/time.h>
34 #endif
35 #include <sys/stat.h>
36 #include <rx.h>
37 #include <rx_globals.h>
38 #include <assert.h>
39 #include <rx/rx_pthread.h>
40
41 /*
42  * Number of times the event handling thread was signalled because a new
43  * event was scheduled earlier than the lastest event.
44  *
45  * Protected by event_handler_mutex
46  */
47 static long rx_pthread_n_event_wakeups;
48
49 /* Set rx_pthread_event_rescheduled if event_handler should just try
50  * again instead of sleeping.
51  *
52  * Protected by event_handler_mutex
53  */
54 static int rx_pthread_event_rescheduled = 0;
55
56 static void rx_ListenerProc(void *);
57
58 /*
59  * We supply an event handling thread for Rx's event processing.
60  * The condition variable is used to wakeup the thread whenever a new
61  * event is scheduled earlier than the previous earliest event.
62  * This thread is also responsible for keeping time.
63  */
64 static pthread_t event_handler_thread;
65 pthread_cond_t rx_event_handler_cond;
66 pthread_mutex_t event_handler_mutex;
67 pthread_cond_t rx_listener_cond;
68 pthread_mutex_t listener_mutex;
69 static int listeners_started = 0;
70 pthread_mutex_t rx_clock_mutex;
71 struct clock rxi_clockNow;
72
73 /*
74  * Delay the current thread the specified number of seconds.
75  */
76 void rxi_Delay(sec)
77     int sec;
78 {
79     sleep(sec);
80 }
81
82 /*
83  * Called from rx_Init()
84  */
85 void rxi_InitializeThreadSupport() {
86
87     listeners_started = 0;
88     gettimeofday((struct timeval *)&rxi_clockNow, NULL);
89 }
90
91 static void *server_entry(void * argp)
92 {
93     void (*server_proc)() = (void (*)()) argp;
94     server_proc();
95     printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
96     exit(1);
97     return (void *) 0;
98 }
99
100 /*
101  * Start an Rx server process.
102  */
103 void rxi_StartServerProc(proc, stacksize)
104     void (*proc)();
105     int stacksize;
106 {
107     pthread_t thread;
108     pthread_attr_t tattr;
109     AFS_SIGSET_DECL;
110
111     if (pthread_attr_init
112         (&tattr) != 0) {
113         printf("Unable to Create Rx server thread (pthread_attr_init)\n");
114         exit(1);
115     }
116
117     if (pthread_attr_setdetachstate
118         (&tattr,
119          PTHREAD_CREATE_DETACHED) != 0) {
120         printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
121         exit(1);
122     }
123
124     /*
125      * NOTE: We are ignoring the stack size parameter, for now.
126      */
127     AFS_SIGSET_CLEAR();
128     if (pthread_create
129         (&thread,
130          &tattr,
131          server_entry,
132          (void *) proc) != 0) {
133         printf("Unable to Create Rx server thread\n");
134         exit(1);
135     }
136     AFS_SIGSET_RESTORE();
137 }
138
139 /*
140  * The event handling process.
141  */
142 static void *event_handler(void *argp)
143 {
144     struct clock rx_pthread_last_event_wait_time = {0,0};
145     unsigned long rx_pthread_n_event_expired = 0;
146     unsigned long rx_pthread_n_event_waits = 0;
147     long rx_pthread_n_event_woken = 0;
148     struct timespec rx_pthread_next_event_time = {0,0};
149
150     assert(pthread_mutex_lock(&event_handler_mutex)==0);
151
152     for (;;) {
153         struct clock cv;
154         struct clock next;
155
156         assert(pthread_mutex_unlock(&event_handler_mutex)==0);
157     
158         next.sec = 30; /* Time to sleep if there are no events scheduled */
159         next.usec = 0;
160         gettimeofday((struct timeval *)&cv, NULL);
161         rxevent_RaiseEvents(&next);
162
163         assert(pthread_mutex_lock(&event_handler_mutex)==0);
164         if (rx_pthread_event_rescheduled) {
165             rx_pthread_event_rescheduled = 0;
166             continue;
167         }
168
169         clock_Add(&cv, &next);
170         rx_pthread_next_event_time.tv_sec = cv.sec;
171         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
172         rx_pthread_n_event_waits++;
173         if (pthread_cond_timedwait
174             (&rx_event_handler_cond,
175              &event_handler_mutex,
176              &rx_pthread_next_event_time) == -1) {
177 #ifdef notdef
178             assert(errno == EAGAIN); 
179 #endif
180             rx_pthread_n_event_expired++;
181         } else {        
182             rx_pthread_n_event_woken++;
183         }
184         rx_pthread_event_rescheduled = 0;
185     }
186 }
187
188
189 /*
190  * This routine will get called by the event package whenever a new,
191  * earlier than others, event is posted. */
192 void rxi_ReScheduleEvents() {
193     assert(pthread_mutex_lock(&event_handler_mutex)==0);
194     pthread_cond_signal(&rx_event_handler_cond);
195     rx_pthread_event_rescheduled = 1;
196     assert(pthread_mutex_unlock(&event_handler_mutex)==0);
197 }
198
199
200 /* Loop to listen on a socket. Return setting *newcallp if this
201  * thread should become a server thread.  */
202 static void rxi_ListenerProc(sock, tnop, newcallp)
203 int sock;
204 int *tnop;
205 struct rx_call **newcallp;
206 {
207     u_long host;
208     u_short port;
209     register struct rx_packet *p = (struct rx_packet *)0;
210
211     assert(pthread_mutex_lock(&listener_mutex)==0);
212     while (!listeners_started) {
213         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0);
214     }
215     assert(pthread_mutex_unlock(&listener_mutex)==0);
216
217     for (;;) {
218         /*
219          * Grab a new packet only if necessary (otherwise re-use the old one)
220          */
221         if (p) {
222             rxi_RestoreDataBufs(p);
223         }
224         else {
225             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
226                 /* Could this happen with multiple socket listeners? */
227                 printf("rxi_Listener: no packets!"); /* Shouldn't happen */
228                 exit(1);
229             }
230         }
231
232         if (rxi_ReadPacket(sock, p, &host, &port)) {
233             clock_NewTime();
234             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
235             if (newcallp && *newcallp) {
236                 if (p)
237                     rxi_FreePacket(p);
238                 return;
239             }
240         }
241     }
242     /* NOTREACHED */
243 }
244
245 /* This is the listener process request loop. The listener process loop
246  * becomes a server thread when rxi_ListenerProc returns, and stays
247  * server thread until rxi_ServerProc returns. */
248 static void rx_ListenerProc(void *argp)
249 {
250     int threadID;
251     int sock = (int) argp;
252     struct rx_call *newcall;
253
254     while(1) {
255         newcall = NULL;
256         threadID = -1;
257         rxi_ListenerProc(sock, &threadID, &newcall);
258         /* assert(threadID != -1); */
259         /* assert(newcall != NULL); */
260         sock = OSI_NULLSOCKET;
261         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
262         rxi_ServerProc(threadID, newcall, &sock);
263         /* assert(sock != OSI_NULLSOCKET); */
264     }
265     /* not reached */
266 }
267
268 /* This is the server process request loop. The server process loop
269  * becomes a listener thread when rxi_ServerProc returns, and stays
270  * listener thread until rxi_ListenerProc returns. */
271 void rx_ServerProc()
272 {
273     int sock;
274     int threadID;
275     struct rx_call *newcall = NULL;
276
277     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
278     MUTEX_ENTER(&rx_stats_mutex);
279     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
280     /* threadID is used for making decisions in GetCall.  Get it by bumping
281      * number of threads handling incoming calls */
282     threadID = rxi_availProcs++;
283     MUTEX_EXIT(&rx_stats_mutex);
284
285     while(1) {
286         sock = OSI_NULLSOCKET;
287         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
288         rxi_ServerProc(threadID, newcall, &sock);
289         /* assert(sock != OSI_NULLSOCKET); */
290         newcall = NULL;
291         rxi_ListenerProc(sock, &threadID, &newcall);
292         /* assert(threadID != -1); */
293         /* assert(newcall != NULL); */
294     }
295     /* not reached */
296 }
297
298 /*
299  * Historically used to start the listener process. We now have multiple
300  * listener processes (one for each socket); these are started by GetUdpSocket.
301  *
302  * The event handling process *is* started here (the old listener used
303  * to also handle events). The listener threads can't actually start 
304  * listening until rxi_StartListener is called because most of R may not
305  * be initialized when rxi_Listen is called.
306  */
307 void rxi_StartListener() {
308     pthread_attr_t tattr;
309     AFS_SIGSET_DECL;
310
311     if (pthread_attr_init
312         (&tattr) != 0) {
313         printf("Unable to create Rx event handling thread (pthread_attr_init)\n");
314         exit(1);
315     }
316
317     if (pthread_attr_setdetachstate
318         (&tattr,
319          PTHREAD_CREATE_DETACHED) != 0) {
320         printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
321         exit(1);
322     }
323
324     AFS_SIGSET_CLEAR();
325     if (pthread_create
326         (&event_handler_thread,
327          &tattr,
328          event_handler,
329          NULL) != 0) {
330         printf("Unable to create Rx event handling thread\n");
331         exit(1);
332     }
333     AFS_SIGSET_RESTORE();
334
335     assert(pthread_mutex_lock(&listener_mutex)==0);
336     assert(pthread_cond_broadcast(&rx_listener_cond)==0);
337     listeners_started = 1;
338     assert(pthread_mutex_unlock(&listener_mutex)==0);
339
340 }
341
342 /*
343  * Listen on the specified socket.
344  */
345 rxi_Listen(sock)
346     osi_socket sock;
347 {
348     pthread_t thread;
349     pthread_attr_t tattr;
350     AFS_SIGSET_DECL;
351
352     if (pthread_attr_init
353         (&tattr) != 0) {
354         printf("Unable to create socket listener thread (pthread_attr_init)\n");
355         exit(1);
356     }
357
358     if (pthread_attr_setdetachstate
359         (&tattr,
360          PTHREAD_CREATE_DETACHED) != 0) {
361         printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
362         exit(1);
363     }
364
365     AFS_SIGSET_CLEAR();
366     if (pthread_create
367         (&thread,
368          &tattr,
369          rx_ListenerProc,
370          (void *) sock) != 0) {
371         printf("Unable to create socket listener thread\n");
372         exit(1);
373     }
374     AFS_SIGSET_RESTORE();
375     return 0;
376 }
377
378
379 /*
380  * Recvmsg.
381  *
382  */
383 int rxi_Recvmsg
384     (int socket,
385      struct msghdr *msg_p,
386      int flags)
387 {
388     int ret;
389     ret = recvmsg(socket, msg_p, flags);
390     return ret;
391 }
392
393 /*
394  * Sendmsg.
395  */
396 rxi_Sendmsg(socket, msg_p, flags)
397     osi_socket socket;
398     struct msghdr *msg_p;
399     int flags;
400 {
401     int ret;
402     ret = sendmsg(socket, msg_p, flags);
403 #ifdef AFS_LINUX22_ENV
404     /* linux unfortunately returns ECONNREFUSED if the target port
405      * is no longer in use */
406     if (ret == -1 && errno != ECONNREFUSED) {
407 #else
408     if (ret == -1) {
409 #endif
410         printf("rxi_sendmsg failed, error %d\n", errno);
411         fflush(stdout);
412     }
413     return 0;
414 }