rx-send-error-code-propagation-20050915
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 RCSID
22     ("$Header$");
23
24 #include <sys/types.h>
25 #include <errno.h>
26 #include <signal.h>
27 #ifndef AFS_NT40_ENV
28 # include <sys/socket.h>
29 # include <sys/file.h>
30 # include <netdb.h>
31 # include <netinet/in.h>
32 # include <net/if.h>
33 # include <sys/ioctl.h>
34 # include <sys/time.h>
35 #endif
36 #include <sys/stat.h>
37 #include <rx/rx.h>
38 #include <rx/rx_globals.h>
39 #include <assert.h>
40 #include <rx/rx_pthread.h>
41 #include <rx/rx_clock.h>
42
43 /*
44  * Number of times the event handling thread was signalled because a new
45  * event was scheduled earlier than the lastest event.
46  *
47  * Protected by event_handler_mutex
48  */
49 static long rx_pthread_n_event_wakeups;
50
51 /* Set rx_pthread_event_rescheduled if event_handler should just try
52  * again instead of sleeping.
53  *
54  * Protected by event_handler_mutex
55  */
56 static int rx_pthread_event_rescheduled = 0;
57
58 static void *rx_ListenerProc(void *);
59
60 /*
61  * We supply an event handling thread for Rx's event processing.
62  * The condition variable is used to wakeup the thread whenever a new
63  * event is scheduled earlier than the previous earliest event.
64  * This thread is also responsible for keeping time.
65  */
66 static pthread_t event_handler_thread;
67 pthread_cond_t rx_event_handler_cond;
68 pthread_mutex_t event_handler_mutex;
69 pthread_cond_t rx_listener_cond;
70 pthread_mutex_t listener_mutex;
71 static int listeners_started = 0;
72 pthread_mutex_t rx_clock_mutex;
73 struct clock rxi_clockNow;
74
75 /*
76  * Delay the current thread the specified number of seconds.
77  */
78 void
79 rxi_Delay(int sec)
80 {
81     sleep(sec);
82 }
83
84 /*
85  * Called from rx_Init()
86  */
87 void
88 rxi_InitializeThreadSupport(void)
89 {
90     listeners_started = 0;
91     clock_GetTime(&rxi_clockNow);
92 }
93
94 static void *
95 server_entry(void *argp)
96 {
97     void (*server_proc) () = (void (*)())argp;
98     server_proc();
99     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
100     exit(1);
101     return (void *)0;
102 }
103
104 /*
105  * Start an Rx server process.
106  */
107 void
108 rxi_StartServerProc(void (*proc) (void), int stacksize)
109 {
110     pthread_t thread;
111     pthread_attr_t tattr;
112     AFS_SIGSET_DECL;
113
114     if (pthread_attr_init(&tattr) != 0) {
115         dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
116         exit(1);
117     }
118
119     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
120         dpf
121             (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
122         exit(1);
123     }
124
125     /*
126      * NOTE: We are ignoring the stack size parameter, for now.
127      */
128     AFS_SIGSET_CLEAR();
129     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
130         dpf(("Unable to Create Rx server thread\n"));
131         exit(1);
132     }
133     AFS_SIGSET_RESTORE();
134 }
135
136 /*
137  * The event handling process.
138  */
139 static void *
140 event_handler(void *argp)
141 {
142     struct clock rx_pthread_last_event_wait_time = { 0, 0 };
143     unsigned long rx_pthread_n_event_expired = 0;
144     unsigned long rx_pthread_n_event_waits = 0;
145     long rx_pthread_n_event_woken = 0;
146     struct timespec rx_pthread_next_event_time = { 0, 0 };
147
148     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
149
150     for (;;) {
151         struct clock cv;
152         struct clock next;
153
154         assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
155
156         next.sec = 30;          /* Time to sleep if there are no events scheduled */
157         next.usec = 0;
158         clock_GetTime(&cv);
159         rxevent_RaiseEvents(&next);
160
161         assert(pthread_mutex_lock(&event_handler_mutex) == 0);
162         if (rx_pthread_event_rescheduled) {
163             rx_pthread_event_rescheduled = 0;
164             continue;
165         }
166
167         clock_Add(&cv, &next);
168         rx_pthread_next_event_time.tv_sec = cv.sec;
169         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
170         rx_pthread_n_event_waits++;
171         if (pthread_cond_timedwait
172             (&rx_event_handler_cond, &event_handler_mutex,
173              &rx_pthread_next_event_time) == -1) {
174 #ifdef notdef
175             assert(errno == EAGAIN);
176 #endif
177             rx_pthread_n_event_expired++;
178         } else {
179             rx_pthread_n_event_woken++;
180         }
181         rx_pthread_event_rescheduled = 0;
182     }
183 }
184
185
186 /*
187  * This routine will get called by the event package whenever a new,
188  * earlier than others, event is posted. */
189 void
190 rxi_ReScheduleEvents(void)
191 {
192     assert(pthread_mutex_lock(&event_handler_mutex) == 0);
193     pthread_cond_signal(&rx_event_handler_cond);
194     rx_pthread_event_rescheduled = 1;
195     assert(pthread_mutex_unlock(&event_handler_mutex) == 0);
196 }
197
198
199 /* Loop to listen on a socket. Return setting *newcallp if this
200  * thread should become a server thread.  */
201 static void
202 rxi_ListenerProc(int sock, int *tnop, struct rx_call **newcallp)
203 {
204     unsigned int host;
205     u_short port;
206     register struct rx_packet *p = (struct rx_packet *)0;
207
208     assert(pthread_mutex_lock(&listener_mutex) == 0);
209     while (!listeners_started) {
210         assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex) == 0);
211     }
212     assert(pthread_mutex_unlock(&listener_mutex) == 0);
213
214     for (;;) {
215         /*
216          * Grab a new packet only if necessary (otherwise re-use the old one)
217          */
218         if (p) {
219             rxi_RestoreDataBufs(p);
220         } else {
221             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
222                 /* Could this happen with multiple socket listeners? */
223                 dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
224                 exit(1);
225             }
226         }
227
228         if (rxi_ReadPacket(sock, p, &host, &port)) {
229             clock_NewTime();
230             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
231             if (newcallp && *newcallp) {
232                 if (p)
233                     rxi_FreePacket(p);
234                 return;
235             }
236         }
237     }
238     /* NOTREACHED */
239 }
240
241 /* This is the listener process request loop. The listener process loop
242  * becomes a server thread when rxi_ListenerProc returns, and stays
243  * server thread until rxi_ServerProc returns. */
244 static void *
245 rx_ListenerProc(void *argp)
246 {
247     int threadID;
248     int sock = (int)argp;
249     struct rx_call *newcall;
250
251     while (1) {
252         newcall = NULL;
253         threadID = -1;
254         rxi_ListenerProc(sock, &threadID, &newcall);
255         /* assert(threadID != -1); */
256         /* assert(newcall != NULL); */
257         sock = OSI_NULLSOCKET;
258         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
259         rxi_ServerProc(threadID, newcall, &sock);
260         /* assert(sock != OSI_NULLSOCKET); */
261     }
262     /* not reached */
263 }
264
265 /* This is the server process request loop. The server process loop
266  * becomes a listener thread when rxi_ServerProc returns, and stays
267  * listener thread until rxi_ListenerProc returns. */
268 void
269 rx_ServerProc(void)
270 {
271     int sock;
272     int threadID;
273     struct rx_call *newcall = NULL;
274
275     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
276     MUTEX_ENTER(&rx_stats_mutex);
277     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
278     /* threadID is used for making decisions in GetCall.  Get it by bumping
279      * number of threads handling incoming calls */
280     /* Unique thread ID: used for scheduling purposes *and* as index into
281      * the host hold table (fileserver). 
282      * The previously used rxi_availProcs is unsuitable as it
283      * will already go up and down as packets arrive while the server
284      * threads are still initialising! The recently introduced
285      * rxi_pthread_hinum does not necessarily lead to a server
286      * thread with id 0, which is not allowed to hop through the
287      * incoming call queue.
288      * So either introduce yet another counter or flag the FCFS
289      * thread... chose the latter.
290      */
291     threadID = ++rxi_pthread_hinum;
292     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
293         rxi_fcfs_thread_num = threadID;
294     ++rxi_availProcs;
295     MUTEX_EXIT(&rx_stats_mutex);
296
297     while (1) {
298         sock = OSI_NULLSOCKET;
299         assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
300         rxi_ServerProc(threadID, newcall, &sock);
301         /* assert(sock != OSI_NULLSOCKET); */
302         newcall = NULL;
303         rxi_ListenerProc(sock, &threadID, &newcall);
304         /* assert(threadID != -1); */
305         /* assert(newcall != NULL); */
306     }
307     /* not reached */
308 }
309
310 /*
311  * Historically used to start the listener process. We now have multiple
312  * listener processes (one for each socket); these are started by GetUdpSocket.
313  *
314  * The event handling process *is* started here (the old listener used
315  * to also handle events). The listener threads can't actually start 
316  * listening until rxi_StartListener is called because most of R may not
317  * be initialized when rxi_Listen is called.
318  */
319 void
320 rxi_StartListener(void)
321 {
322     pthread_attr_t tattr;
323     AFS_SIGSET_DECL;
324
325     if (pthread_attr_init(&tattr) != 0) {
326         dpf
327             (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
328         exit(1);
329     }
330
331     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
332         dpf
333             (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
334         exit(1);
335     }
336
337     AFS_SIGSET_CLEAR();
338     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
339         0) {
340         dpf(("Unable to create Rx event handling thread\n"));
341         exit(1);
342     }
343     MUTEX_ENTER(&rx_stats_mutex);
344     ++rxi_pthread_hinum;
345     MUTEX_EXIT(&rx_stats_mutex);
346     AFS_SIGSET_RESTORE();
347
348     assert(pthread_mutex_lock(&listener_mutex) == 0);
349     assert(pthread_cond_broadcast(&rx_listener_cond) == 0);
350     listeners_started = 1;
351     assert(pthread_mutex_unlock(&listener_mutex) == 0);
352
353 }
354
355 /*
356  * Listen on the specified socket.
357  */
358 int
359 rxi_Listen(osi_socket sock)
360 {
361     pthread_t thread;
362     pthread_attr_t tattr;
363     AFS_SIGSET_DECL;
364
365     if (pthread_attr_init(&tattr) != 0) {
366         dpf
367             (("Unable to create socket listener thread (pthread_attr_init)\n"));
368         exit(1);
369     }
370
371     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
372         dpf
373             (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
374         exit(1);
375     }
376
377     AFS_SIGSET_CLEAR();
378     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)sock) != 0) {
379         dpf(("Unable to create socket listener thread\n"));
380         exit(1);
381     }
382     MUTEX_ENTER(&rx_stats_mutex);
383     ++rxi_pthread_hinum;
384     MUTEX_EXIT(&rx_stats_mutex);
385     AFS_SIGSET_RESTORE();
386     return 0;
387 }
388
389
390 /*
391  * Recvmsg.
392  *
393  */
394 int
395 rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
396 {
397     int ret;
398     ret = recvmsg(socket, msg_p, flags);
399     return ret;
400 }
401
402 /*
403  * Sendmsg.
404  */
405 int
406 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
407 {
408     int ret;
409     ret = sendmsg(socket, msg_p, flags);
410 #ifdef AFS_LINUX22_ENV
411     /* linux unfortunately returns ECONNREFUSED if the target port
412      * is no longer in use */
413     /* and EAGAIN if a UDP checksum is incorrect */
414     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
415 #else
416     if (ret == -1) {
417 #endif
418         dpf(("rxi_sendmsg failed, error %d\n", errno));
419         fflush(stdout);
420         return -1;
421     }
422     return 0;
423 }
424
425 struct rx_ts_info_t * rx_ts_info_init() {
426     register struct rx_ts_info_t * rx_ts_info;
427     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
428     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
429     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
430 #ifdef RX_ENABLE_TSFPQ
431     queue_Init(&rx_ts_info->_FPQ);
432
433     MUTEX_ENTER(&rx_stats_mutex);
434     rx_TSFPQMaxProcs++;
435     RX_TS_FPQ_COMPUTE_LIMITS;
436     MUTEX_EXIT(&rx_stats_mutex);
437 #endif /* RX_ENABLE_TSFPQ */
438     return rx_ts_info;
439 }