djgpp-killer-20060801
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 RCSID
25     ("$Header$");
26
27 # include <sys/types.h>         /* fd_set on older platforms */
28 # include <errno.h>
29 # include <signal.h>
30 #ifdef AFS_NT40_ENV
31 # include <winsock2.h>
32 #else
33 # include <unistd.h>            /* select() prototype */
34 # include <sys/time.h>          /* struct timeval, select() prototype */
35 # ifndef FD_SET
36 #  include <sys/select.h>       /* fd_set on newer platforms */
37 # endif
38 # include <sys/socket.h>
39 # include <sys/file.h>
40 # include <netdb.h>
41 # include <sys/stat.h>
42 # include <netinet/in.h>
43 # include <net/if.h>
44 # include <sys/ioctl.h>
45 # include <sys/time.h>
46 #endif
47 # include "rx.h"
48 # include "rx_globals.h"
49 # include <lwp.h>
50
51 #define MAXTHREADNAMELENGTH 64
52
53 extern int (*registerProgram) ();
54 extern int (*swapNameProgram) ();
55
56 int debugSelectFailure;         /* # of times select failed */
57
58 /*
59  * Sleep on the unique wait channel provided.
60  */
61 void
62 rxi_Sleep(void *addr)
63 {
64     LWP_WaitProcess(addr);
65 }
66
67 /*
68  * Wakeup any threads on the channel provided.
69  * They may be woken up spuriously, and must check any conditions.
70  */
71 void
72 rxi_Wakeup(void *addr)
73 {
74     LWP_NoYieldSignal(addr);
75 }
76
77 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
78 static int rx_ListenerProc(void *dummy);
79
80 /*
81  * Delay the current thread the specified number of seconds.
82  */
83 void
84 rxi_Delay(int sec)
85 {
86     IOMGR_Sleep(sec);
87 }
88
89 static int quitListening = 0;
90
91 /* This routine will kill the listener thread, if it exists. */
92 void
93 rxi_StopListener(void)
94 {
95     quitListening = 1;
96     rxi_ReScheduleEvents();
97 }
98
99 /* This routine will get called by the event package whenever a new,
100    earlier than others, event is posted.  If the Listener process
101    is blocked in selects, this will unblock it.  It also can be called
102    to force a new trip through the rxi_Listener select loop when the set
103    of file descriptors it should be listening to changes... */
104 void
105 rxi_ReScheduleEvents(void)
106 {
107     if (rx_listenerPid)
108         IOMGR_Cancel(rx_listenerPid);
109 }
110
111 void
112 rxi_InitializeThreadSupport(void)
113 {
114     PROCESS junk;
115
116     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
117     IOMGR_Initialize();
118     FD_ZERO(&rx_selectMask);
119 }
120
121 void
122 rxi_StartServerProc(void (*proc) (void), int stacksize)
123 {
124     PROCESS scratchPid;
125     static int number = 0;
126     char name[32];
127
128     sprintf(name, "srv_%d", ++number);
129     LWP_CreateProcess((int (*)(void *))proc, stacksize, RX_PROCESS_PRIORITY, (void *)0,
130                       "rx_ServerProc", &scratchPid);
131     if (registerProgram)
132         (*registerProgram) (scratchPid, name);
133 }
134
135 void
136 rxi_StartListener(void)
137 {
138     /* Priority of listener should be high, so it can keep conns alive */
139 #define RX_LIST_STACK   24000
140     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
141                       (void *)0, "rx_Listener", &rx_listenerPid);
142     if (registerProgram)
143         (*registerProgram) (rx_listenerPid, "listener");
144 }
145
146 /* The main loop which listens to the net for datagrams, and handles timeouts
147    and retransmissions, etc.  It also is responsible for scheduling the
148    execution of pending events (in conjunction with event.c).
149
150    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
151    keeping up with the incoming stream of packets (because there are threads that
152    are interfering with its running sufficiently often), rx does a polling select
153    before doing a real IOMGR_Select system call.  Doing a real select means that
154    we don't have to let other processes run before processing more packets.
155
156    So, our algorithm is that if the last poll on the file descriptor found useful data, or
157    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
158    then we try the polling select before the IOMGR_Select.  If we eventually catch up
159    (which we can tell by the polling select returning no input packets ready), then we
160    don't do a polling select again until several seconds later (via nextPollTime mechanism).
161    */
162
163 static void
164 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
165 {
166     struct sockaddr_storage saddr;
167     int slen;
168     register struct rx_packet *p = (struct rx_packet *)0;
169     osi_socket socket;
170     struct clock cv;
171     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
172     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
173     struct timeval tv, *tvp;
174     int code;
175 #ifdef AFS_NT40_ENV
176     int i;
177 #endif
178     PROCESS pid;
179     char name[MAXTHREADNAMELENGTH] = "srv_0";
180
181     clock_NewTime();
182     lastPollWorked = 0;
183     nextPollTime = 0;
184     code = LWP_CurrentProcess(&pid);
185     if (code) {
186         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
187         exit(1);
188     }
189     rx_listenerPid = pid;
190     if (swapNameProgram)
191         (*swapNameProgram) (pid, "listener", &name[0]);
192
193     for (;;) {
194         /* Grab a new packet only if necessary (otherwise re-use the old one) */
195         if (p) {
196             rxi_RestoreDataBufs(p);
197         } else {
198             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
199                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
200         }
201         /* Wait for the next event time or a packet to arrive. */
202         /* event_RaiseEvents schedules any events whose time has come and
203          * then atomically computes the time to the next event, guaranteeing
204          * that this is positive.  If there is no next event, it returns 0 */
205         clock_NewTime();
206         if (!rxevent_RaiseEvents(&cv))
207             tvp = NULL;
208         else {
209             /* It's important to copy cv to tv, because the 4.3 documentation
210              * for select threatens that *tv may be updated after a select, in
211              * future editions of the system, to indicate how much of the time
212              * period has elapsed.  So we shouldn't rely on tv not being altered. */
213             tv.tv_sec = cv.sec; /* Time to next event */
214             tv.tv_usec = cv.usec;
215             tvp = &tv;
216         }
217         rx_stats.selects++;
218
219         *rfds = rx_selectMask;
220
221         if (lastPollWorked || nextPollTime < clock_Sec()) {
222             /* we're catching up, or haven't tried to for a few seconds */
223             doingPoll = 1;
224             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
225             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
226             tvp = &tv;
227             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
228         } else {
229             doingPoll = 0;
230             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
231         }
232         lastPollWorked = 0;     /* default is that it didn't find anything */
233
234         if (quitListening) {
235             quitListening = 0;
236             LWP_DestroyProcess(pid);
237         }
238
239         switch (code) {
240         case 0:
241             /* Timer interrupt:
242              * If it was a timer interrupt then we can assume that
243              * the time has advanced by roughly the value of the
244              * previous timeout, and that there is now at least
245              * one pending event.
246              */
247             clock_NewTime();
248             break;
249         case -1:
250             /* select or IOMGR_Select returned failure */
251             debugSelectFailure++;       /* update debugging counter */
252             clock_NewTime();
253             break;
254         case -2:
255             /* IOMGR_Cancel:
256              * IOMGR_Cancel is invoked whenever a new event is
257              * posted that is earlier than any existing events.
258              * So we re-evaluate the time, and then go back to
259              * reschedule events
260              */
261             clock_NewTime();
262             break;
263
264         default:
265             /* Packets have arrived, presumably:
266              * If it wasn't a timer interrupt, then no event should have
267              * timed out yet (well some event may have, but only just...), so
268              * we don't bother looking to see if any have timed out, but just
269              * go directly to reading the data packets
270              */
271             clock_NewTime();
272             if (doingPoll)
273                 lastPollWorked = 1;
274 #ifdef AFS_NT40_ENV
275             for (i = 0; p && i < rfds->fd_count; i++) {
276                 socket = rfds->fd_array[i];
277                 slen = sizeof(saddr);
278                 if (rxi_ReadPacket(socket, p, &saddr, &slen)) {
279                     *newcallp = NULL;
280                     p = rxi_ReceivePacket(p, socket, &saddr, slen, tnop,
281                                           newcallp);
282                     if (newcallp && *newcallp) {
283                         if (p) {
284                             rxi_FreePacket(p);
285                         }
286                         if (swapNameProgram) {
287                             (*swapNameProgram) (rx_listenerPid, &name, 0);
288                             rx_listenerPid = 0;
289                         }
290                         return;
291                     }
292                 }
293             }
294 #else
295             for (socket = rx_minSocketNumber;
296                  p && socket <= rx_maxSocketNumber; socket++) {
297                 if (!FD_ISSET(socket, rfds))
298                     continue;
299                 slen = sizeof(saddr);
300                 if (rxi_ReadPacket(socket, p, &saddr, &slen)) {
301                     p = rxi_ReceivePacket(p, socket, &saddr, slen, tnop,
302                                           newcallp);
303                     if (newcallp && *newcallp) {
304                         if (p) {
305                             rxi_FreePacket(p);
306                         }
307                         if (swapNameProgram) {
308                             (*swapNameProgram) (rx_listenerPid, &name, 0);
309                             rx_listenerPid = 0;
310                         }
311                         return;
312                     }
313                 }
314             }
315 #endif
316             break;
317         }
318     }
319     /* NOTREACHED */
320 }
321
322 /* This is the listener process request loop. The listener process loop
323  * becomes a server thread when rxi_ListenerProc returns, and stays
324  * server thread until rxi_ServerProc returns. */
325 static int
326 rx_ListenerProc(void *dummy)
327 {
328     int threadID;
329     osi_socket sock;
330     struct rx_call *newcall;
331     fd_set *rfds;
332
333     if (!(rfds = IOMGR_AllocFDSet())) {
334         osi_Panic("rx_ListenerProc: no fd_sets!\n");
335     }
336
337     while (1) {
338         newcall = NULL;
339         threadID = -1;
340         rxi_ListenerProc(rfds, &threadID, &newcall);
341         /* assert(threadID != -1); */
342         /* assert(newcall != NULL); */
343         sock = OSI_NULLSOCKET;
344         rxi_ServerProc(threadID, newcall, &sock);
345         /* assert(sock != OSI_NULLSOCKET); */
346     }
347     /* not reached */
348 }
349
350 /* This is the server process request loop. The server process loop
351  * becomes a listener thread when rxi_ServerProc returns, and stays
352  * listener thread until rxi_ListenerProc returns. */
353 void
354 rx_ServerProc(void)
355 {
356     osi_socket sock;
357     int threadID;
358     struct rx_call *newcall = NULL;
359     fd_set *rfds;
360
361     if (!(rfds = IOMGR_AllocFDSet())) {
362         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
363     }
364
365     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
366     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
367     /* threadID is used for making decisions in GetCall.  Get it by bumping
368      * number of threads handling incoming calls */
369     threadID = rxi_availProcs++;
370
371     while (1) {
372         sock = OSI_NULLSOCKET;
373         rxi_ServerProc(threadID, newcall, &sock);
374         /* assert(sock != OSI_NULLSOCKET); */
375         newcall = NULL;
376         rxi_ListenerProc(rfds, &threadID, &newcall);
377         /* assert(threadID != -1); */
378         /* assert(newcall != NULL); */
379     }
380     /* not reached */
381 }
382
383 /*
384  * Called from GetUDPSocket.
385  * Called from a single thread at startup.
386  * Returns 0 on success; -1 on failure.
387  */
388 int
389 rxi_Listen(osi_socket sock)
390 {
391 #ifndef AFS_NT40_ENV
392     /*
393      * Put the socket into non-blocking mode so that rx_Listener
394      * can do a polling read before entering select
395      */
396     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
397         perror("fcntl");
398         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
399         return -1;
400     }
401
402     if (sock > FD_SETSIZE - 1) {
403         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
404          FD_SETSIZE - 1);
405         return -1;
406     }
407 #endif
408
409     FD_SET(sock, &rx_selectMask);
410     if (sock > rx_maxSocketNumber)
411         rx_maxSocketNumber = sock;
412     if (sock < rx_minSocketNumber)
413         rx_minSocketNumber = sock;
414     return 0;
415 }
416
417 /*
418  * Recvmsg
419  */
420 int
421 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
422 {
423     return recvmsg(socket, msg_p, flags);
424 }
425
426 /*
427  * Simulate a blocking sendmsg on the non-blocking socket.
428  * It's non blocking because it was set that way for recvmsg.
429  */
430 int
431 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
432 {
433     fd_set *sfds = (fd_set *) 0;
434     while (sendmsg(socket, msg_p, flags) == -1) {
435         int err;
436         rx_stats.sendSelects++;
437
438         if (!sfds) {
439             if (!(sfds = IOMGR_AllocFDSet())) {
440                 (osi_Msg "rx failed to alloc fd_set: ");
441                 perror("rx_sendmsg");
442                 return 3;
443             }
444             FD_SET(socket, sfds);
445         }
446 #ifdef AFS_NT40_ENV
447         if (errno)
448 #elif defined(AFS_LINUX22_ENV)
449         /* linux unfortunately returns ECONNREFUSED if the target port
450          * is no longer in use */
451         /* and EAGAIN if a UDP checksum is incorrect */
452         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
453             && errno != EAGAIN)
454 #else
455         if (errno != EWOULDBLOCK && errno != ENOBUFS)
456 #endif
457         {
458             (osi_Msg "rx failed to send packet: ");
459             perror("rx_sendmsg");
460             return 3;
461         }
462         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
463             if (err >= 0 || errno != EINTR)
464                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
465         }
466     }
467     if (sfds)
468         IOMGR_FreeFDSet(sfds);
469     return 0;
470 }