rx-thread-id-assignment-fixes-20030203
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID("$Header$");
22
23 #include <sys/types.h>
24 #include <errno.h>
25 #include <signal.h>
26 #ifndef AFS_NT40_ENV
27 # include <sys/socket.h>
28 # include <sys/file.h>
29 # include <netdb.h>
30 # include <netinet/in.h>
31 # include <net/if.h>
32 # include <sys/ioctl.h>
33 # include <sys/time.h>
34 #endif
35 #include <sys/stat.h>
36 #include <rx.h>
37 #include <rx_globals.h>
38 #include <assert.h>
39 #include <rx/rx_pthread.h>
40
41 /*
42  * Number of times the event handling thread was signalled because a new
43  * event was scheduled earlier than the lastest event.
44  *
45  * Protected by event_handler_mutex
46  */
47 static long rx_pthread_n_event_wakeups;
48
49 /* Set rx_pthread_event_rescheduled if event_handler should just try
50  * again instead of sleeping.
51  *
52  * Protected by event_handler_mutex
53  */
54 static int rx_pthread_event_rescheduled = 0;
55
56 static void *rx_ListenerProc(void *);
57
58 /*
59  * We supply an event handling thread for Rx's event processing.
60  * The condition variable is used to wakeup the thread whenever a new
61  * event is scheduled earlier than the previous earliest event.
62  * This thread is also responsible for keeping time.
63  */
64 static pthread_t event_handler_thread;
65 pthread_cond_t rx_event_handler_cond;
66 pthread_mutex_t event_handler_mutex;
67 pthread_cond_t rx_listener_cond;
68 pthread_mutex_t listener_mutex;
69 static int listeners_started = 0;
70 pthread_mutex_t rx_clock_mutex;
71 struct clock rxi_clockNow;
72
73 /*
74  * Delay the current thread the specified number of seconds.
75  */
76 void rxi_Delay(int sec)
77 {
78     sleep(sec);
79 }
80
81 /*
82  * Called from rx_Init()
83  */
84 void rxi_InitializeThreadSupport(void)
85 {
86     listeners_started = 0;
87     gettimeofday((struct timeval *)&rxi_clockNow, NULL);
88 }
89
90 static void *server_entry(void * argp)
91 {
92     void (*server_proc)() = (void (*)()) argp;
93     server_proc();
94     printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
95     exit(1);
96     return (void *) 0;
97 }
98
99 /*
100  * Start an Rx server process.
101  */
102 void rxi_StartServerProc(void (*proc)(void), int stacksize)
103 {
104     pthread_t thread;
105     pthread_attr_t tattr;
106     AFS_SIGSET_DECL;
107
108     if (pthread_attr_init
109         (&tattr) != 0) {
110         printf("Unable to Create Rx server thread (pthread_attr_init)\n");
111         exit(1);
112     }
113
114     if (pthread_attr_setdetachstate
115         (&tattr,
116          PTHREAD_CREATE_DETACHED) != 0) {
117         printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
118         exit(1);
119     }
120
121     /*
122      * NOTE: We are ignoring the stack size parameter, for now.
123      */
124     AFS_SIGSET_CLEAR();
125     if (pthread_create
126         (&thread,
127          &tattr,
128          server_entry,
129          (void *) proc) != 0) {
130         printf("Unable to Create Rx server thread\n");
131         exit(1);
132     }
133     MUTEX_ENTER(&rx_stats_mutex);
134     ++rxi_pthread_hinum;
135     MUTEX_EXIT(&rx_stats_mutex);
136     AFS_SIGSET_RESTORE();
137 }
138
139 /*
140  * The event handling process.
141  */
142 static void *event_handler(void *argp)
143 {
144     struct clock rx_pthread_last_event_wait_time = {0,0};
145     unsigned long rx_pthread_n_event_expired = 0;
146     unsigned long rx_pthread_n_event_waits = 0;
147     long rx_pthread_n_event_woken = 0;
148     struct timespec rx_pthread_next_event_time = {0,0};
149
150     assert(pthread_mutex_lock(&event_handler_mutex)==0);
151
152     for (;;) {
153         struct clock cv;
154         struct clock next;
155
156         assert(pthread_mutex_unlock(&event_handler_mutex)==0);
157     
158         next.sec = 30; /* Time to sleep if there are no events scheduled */
159         next.usec = 0;
160         gettimeofday((struct timeval *)&cv, NULL);
161         rxevent_RaiseEvents(&next);
162
163         assert(pthread_mutex_lock(&event_handler_mutex)==0);
164         if (rx_pthread_event_rescheduled) {
165             rx_pthread_event_rescheduled = 0;
166             continue;
167         }
168
169         clock_Add(&cv, &next);
170         rx_pthread_next_event_time.tv_sec = cv.sec;
171         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
172         rx_pthread_n_event_waits++;
173         if (pthread_cond_timedwait
174             (&rx_event_handler_cond,
175              &event_handler_mutex,
176              &rx_pthread_next_event_time) == -1) {
177 #ifdef notdef
178             assert(errno == EAGAIN); 
179 #endif
180             rx_pthread_n_event_expired++;
181         } else {        
182             rx_pthread_n_event_woken++;
183         }
184         rx_pthread_event_rescheduled = 0;
185     }
186 }
187
188
189 /*
190  * This routine will get called by the event package whenever a new,
191  * earlier than others, event is posted. */
192 void rxi_ReScheduleEvents(void)
193 {
194     assert(pthread_mutex_lock(&event_handler_mutex)==0);
195     pthread_cond_signal(&rx_event_handler_cond);
196     rx_pthread_event_rescheduled = 1;
197     assert(pthread_mutex_unlock(&event_handler_mutex)==0);
198 }
199
200
201 /* Loop to listen on a socket. Return setting *newcallp if this
202  * thread should become a server thread.  */
203 static void rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
204 {
205     unsigned int host;
206     u_short port;
207     register struct rx_packet *p = (struct rx_packet *)0;
208
209     assert(pthread_mutex_lock(&listener_mutex)==0);
210     while (!listeners_started) {
211         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0);
212     }
213     assert(pthread_mutex_unlock(&listener_mutex)==0);
214
215     for (;;) {
216         /*
217          * Grab a new packet only if necessary (otherwise re-use the old one)
218          */
219         if (p) {
220             rxi_RestoreDataBufs(p);
221         }
222         else {
223             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
224                 /* Could this happen with multiple socket listeners? */
225                 printf("rxi_Listener: no packets!"); /* Shouldn't happen */
226                 exit(1);
227             }
228         }
229
230         if (rxi_ReadPacket(sock, p, &host, &port)) {
231             clock_NewTime();
232             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
233             if (newcallp && *newcallp) {
234                 if (p)
235                     rxi_FreePacket(p);
236                 return;
237             }
238         }
239     }
240     /* NOTREACHED */
241 }
242
243 /* This is the listener process request loop. The listener process loop
244  * becomes a server thread when rxi_ListenerProc returns, and stays
245  * server thread until rxi_ServerProc returns. */
246 static void *rx_ListenerProc(void *argp)
247 {
248     int threadID;
249     int sock = (int) argp;
250     struct rx_call *newcall;
251
252     while(1) {
253         newcall = NULL;
254         threadID = -1;
255         rxi_ListenerProc(sock, &threadID, &newcall);
256         /* assert(threadID != -1); */
257         /* assert(newcall != NULL); */
258         sock = OSI_NULLSOCKET;
259         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
260         rxi_ServerProc(threadID, newcall, &sock);
261         /* assert(sock != OSI_NULLSOCKET); */
262     }
263     /* not reached */
264 }
265
266 /* This is the server process request loop. The server process loop
267  * becomes a listener thread when rxi_ServerProc returns, and stays
268  * listener thread until rxi_ListenerProc returns. */
269 void rx_ServerProc(void)
270 {
271     int sock;
272     int threadID;
273     struct rx_call *newcall = NULL;
274
275     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
276     MUTEX_ENTER(&rx_stats_mutex);
277     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
278     /* threadID is used for making decisions in GetCall.  Get it by bumping
279      * number of threads handling incoming calls */
280     threadID = rxi_availProcs++;
281     MUTEX_EXIT(&rx_stats_mutex);
282
283     while(1) {
284         sock = OSI_NULLSOCKET;
285         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
286         rxi_ServerProc(threadID, newcall, &sock);
287         /* assert(sock != OSI_NULLSOCKET); */
288         newcall = NULL;
289         rxi_ListenerProc(sock, &threadID, &newcall);
290         /* assert(threadID != -1); */
291         /* assert(newcall != NULL); */
292     }
293     /* not reached */
294 }
295
296 /*
297  * Historically used to start the listener process. We now have multiple
298  * listener processes (one for each socket); these are started by GetUdpSocket.
299  *
300  * The event handling process *is* started here (the old listener used
301  * to also handle events). The listener threads can't actually start 
302  * listening until rxi_StartListener is called because most of R may not
303  * be initialized when rxi_Listen is called.
304  */
305 void rxi_StartListener(void)
306 {
307     pthread_attr_t tattr;
308     AFS_SIGSET_DECL;
309
310     if (pthread_attr_init
311         (&tattr) != 0) {
312         printf("Unable to create Rx event handling thread (pthread_attr_init)\n");
313         exit(1);
314     }
315
316     if (pthread_attr_setdetachstate
317         (&tattr,
318          PTHREAD_CREATE_DETACHED) != 0) {
319         printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
320         exit(1);
321     }
322
323     AFS_SIGSET_CLEAR();
324     if (pthread_create
325         (&event_handler_thread,
326          &tattr,
327          event_handler,
328          NULL) != 0) {
329         printf("Unable to create Rx event handling thread\n");
330         exit(1);
331     }
332     MUTEX_ENTER(&rx_stats_mutex);
333     ++rxi_pthread_hinum;
334     MUTEX_EXIT(&rx_stats_mutex);
335     AFS_SIGSET_RESTORE();
336
337     assert(pthread_mutex_lock(&listener_mutex)==0);
338     assert(pthread_cond_broadcast(&rx_listener_cond)==0);
339     listeners_started = 1;
340     assert(pthread_mutex_unlock(&listener_mutex)==0);
341
342 }
343
344 /*
345  * Listen on the specified socket.
346  */
347 int rxi_Listen(osi_socket sock)
348 {
349     pthread_t thread;
350     pthread_attr_t tattr;
351     AFS_SIGSET_DECL;
352
353     if (pthread_attr_init
354         (&tattr) != 0) {
355         printf("Unable to create socket listener thread (pthread_attr_init)\n");
356         exit(1);
357     }
358
359     if (pthread_attr_setdetachstate
360         (&tattr,
361          PTHREAD_CREATE_DETACHED) != 0) {
362         printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
363         exit(1);
364     }
365
366     AFS_SIGSET_CLEAR();
367     if (pthread_create
368         (&thread,
369          &tattr,
370          rx_ListenerProc,
371          (void *) sock) != 0) {
372         printf("Unable to create socket listener thread\n");
373         exit(1);
374     }
375     MUTEX_ENTER(&rx_stats_mutex);
376     ++rxi_pthread_hinum;
377     MUTEX_EXIT(&rx_stats_mutex);
378     AFS_SIGSET_RESTORE();
379     return 0;
380 }
381
382
383 /*
384  * Recvmsg.
385  *
386  */
387 int rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
388 {
389     int ret;
390     ret = recvmsg(socket, msg_p, flags);
391     return ret;
392 }
393
394 /*
395  * Sendmsg.
396  */
397 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
398 {
399     int ret;
400     ret = sendmsg(socket, msg_p, flags);
401 #ifdef AFS_LINUX22_ENV
402     /* linux unfortunately returns ECONNREFUSED if the target port
403      * is no longer in use */
404     /* and EAGAIN if a UDP checksum is incorrect */
405     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
406 #else
407     if (ret == -1) {
408 #endif
409         printf("rxi_sendmsg failed, error %d\n", errno);
410         fflush(stdout);
411     }
412     return 0;
413 }