rx-lwp-fdsetsize-20040708
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #define FD_SETSIZE 65536
18
19 #include <afsconfig.h>
20 #include <afs/param.h>
21
22 RCSID
23     ("$Header$");
24
25 # include <sys/types.h>         /* fd_set on older platforms */
26 # include <errno.h>
27 # include <signal.h>
28 #ifdef AFS_NT40_ENV
29 # include <winsock2.h>
30 #else
31 # include <unistd.h>            /* select() prototype */
32 # include <sys/time.h>          /* struct timeval, select() prototype */
33 # ifndef FD_SET
34 #  include <sys/select.h>       /* fd_set on newer platforms */
35 # endif
36 # include <sys/socket.h>
37 # include <sys/file.h>
38 # include <netdb.h>
39 # include <sys/stat.h>
40 # include <netinet/in.h>
41 # include <net/if.h>
42 # include <sys/ioctl.h>
43 # include <sys/time.h>
44 #endif
45 # include "rx.h"
46 # include "rx_globals.h"
47 # include <lwp.h>
48
49 #define MAXTHREADNAMELENGTH 64
50
51 extern int (*registerProgram) ();
52 extern int (*swapNameProgram) ();
53
54 int debugSelectFailure;         /* # of times select failed */
55
56 /*
57  * Sleep on the unique wait channel provided.
58  */
59 void
60 rxi_Sleep(void *addr)
61 {
62     LWP_WaitProcess(addr);
63 }
64
65 /*
66  * Wakeup any threads on the channel provided.
67  * They may be woken up spuriously, and must check any conditions.
68  */
69 void
70 rxi_Wakeup(void *addr)
71 {
72     LWP_NoYieldSignal(addr);
73 }
74
75 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
76 static void rx_ListenerProc(void *dummy);
77
78 /*
79  * Delay the current thread the specified number of seconds.
80  */
81 void
82 rxi_Delay(int sec)
83 {
84     IOMGR_Sleep(sec);
85 }
86
87 static int quitListening = 0;
88
89 /* This routine will kill the listener thread, if it exists. */
90 void
91 rxi_StopListener(void)
92 {
93     quitListening = 1;
94     rxi_ReScheduleEvents();
95 }
96
97 /* This routine will get called by the event package whenever a new,
98    earlier than others, event is posted.  If the Listener process
99    is blocked in selects, this will unblock it.  It also can be called
100    to force a new trip through the rxi_Listener select loop when the set
101    of file descriptors it should be listening to changes... */
102 void
103 rxi_ReScheduleEvents(void)
104 {
105     if (rx_listenerPid)
106         IOMGR_Cancel(rx_listenerPid);
107 }
108
109 void
110 rxi_InitializeThreadSupport(void)
111 {
112     PROCESS junk;
113
114     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
115     IOMGR_Initialize();
116     FD_ZERO(&rx_selectMask);
117 }
118
119 void
120 rxi_StartServerProc(void (*proc) (void), int stacksize)
121 {
122     PROCESS scratchPid;
123     static int number = 0;
124     char name[32];
125
126     sprintf(name, "srv_%d", ++number);
127     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, (void *)0,
128                       "rx_ServerProc", &scratchPid);
129     if (registerProgram)
130         (*registerProgram) (scratchPid, name);
131 }
132
133 void
134 rxi_StartListener(void)
135 {
136     /* Priority of listener should be high, so it can keep conns alive */
137 #define RX_LIST_STACK   24000
138     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
139                       (void *)0, "rx_Listener", &rx_listenerPid);
140     if (registerProgram)
141         (*registerProgram) (rx_listenerPid, "listener");
142 }
143
144 /* The main loop which listens to the net for datagrams, and handles timeouts
145    and retransmissions, etc.  It also is responsible for scheduling the
146    execution of pending events (in conjunction with event.c).
147
148    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
149    keeping up with the incoming stream of packets (because there are threads that
150    are interfering with its running sufficiently often), rx does a polling select
151    before doing a real IOMGR_Select system call.  Doing a real select means that
152    we don't have to let other processes run before processing more packets.
153
154    So, our algorithm is that if the last poll on the file descriptor found useful data, or
155    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
156    then we try the polling select before the IOMGR_Select.  If we eventually catch up
157    (which we can tell by the polling select returning no input packets ready), then we
158    don't do a polling select again until several seconds later (via nextPollTime mechanism).
159    */
160
161 static void
162 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
163 {
164     afs_uint32 host;
165     u_short port;
166     register struct rx_packet *p = (struct rx_packet *)0;
167     int socket;
168     struct clock cv;
169     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
170     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
171     struct timeval tv, *tvp;
172     int code;
173 #ifdef AFS_NT40_ENV
174     int i;
175 #endif
176     PROCESS pid;
177     char name[MAXTHREADNAMELENGTH] = "srv_0";
178
179     clock_NewTime();
180     lastPollWorked = 0;
181     nextPollTime = 0;
182     code = LWP_CurrentProcess(&pid);
183     if (code) {
184         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
185         exit(1);
186     }
187     rx_listenerPid = pid;
188     if (swapNameProgram)
189         (*swapNameProgram) (pid, "listener", &name[0]);
190
191     for (;;) {
192         /* Grab a new packet only if necessary (otherwise re-use the old one) */
193         if (p) {
194             rxi_RestoreDataBufs(p);
195         } else {
196             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
197                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
198         }
199         /* Wait for the next event time or a packet to arrive. */
200         /* event_RaiseEvents schedules any events whose time has come and
201          * then atomically computes the time to the next event, guaranteeing
202          * that this is positive.  If there is no next event, it returns 0 */
203         clock_NewTime();
204         if (!rxevent_RaiseEvents(&cv))
205             tvp = NULL;
206         else {
207             /* It's important to copy cv to tv, because the 4.3 documentation
208              * for select threatens that *tv may be updated after a select, in
209              * future editions of the system, to indicate how much of the time
210              * period has elapsed.  So we shouldn't rely on tv not being altered. */
211             tv.tv_sec = cv.sec; /* Time to next event */
212             tv.tv_usec = cv.usec;
213             tvp = &tv;
214         }
215         rx_stats.selects++;
216
217         *rfds = rx_selectMask;
218
219         if (lastPollWorked || nextPollTime < clock_Sec()) {
220             /* we're catching up, or haven't tried to for a few seconds */
221             doingPoll = 1;
222             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
223             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
224             tvp = &tv;
225             code = select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
226         } else {
227             doingPoll = 0;
228             code = IOMGR_Select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
229         }
230         lastPollWorked = 0;     /* default is that it didn't find anything */
231
232         if (quitListening) {
233             quitListening = 0;
234             LWP_DestroyProcess(pid);
235         }
236
237         switch (code) {
238         case 0:
239             /* Timer interrupt:
240              * If it was a timer interrupt then we can assume that
241              * the time has advanced by roughly the value of the
242              * previous timeout, and that there is now at least
243              * one pending event.
244              */
245             clock_NewTime();
246             break;
247         case -1:
248             /* select or IOMGR_Select returned failure */
249             debugSelectFailure++;       /* update debugging counter */
250             clock_NewTime();
251             break;
252         case -2:
253             /* IOMGR_Cancel:
254              * IOMGR_Cancel is invoked whenever a new event is
255              * posted that is earlier than any existing events.
256              * So we re-evaluate the time, and then go back to
257              * reschedule events
258              */
259             clock_NewTime();
260             break;
261
262         default:
263             /* Packets have arrived, presumably:
264              * If it wasn't a timer interrupt, then no event should have
265              * timed out yet (well some event may have, but only just...), so
266              * we don't bother looking to see if any have timed out, but just
267              * go directly to reading the data packets
268              */
269             clock_NewTime();
270             if (doingPoll)
271                 lastPollWorked = 1;
272 #ifdef AFS_NT40_ENV
273             for (i = 0; p && i < rfds->fd_count; i++) {
274                 socket = rfds->fd_array[i];
275                 if (rxi_ReadPacket(socket, p, &host, &port)) {
276                     *newcallp = NULL;
277                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
278                                           newcallp);
279                     if (newcallp && *newcallp) {
280                         if (p) {
281                             rxi_FreePacket(p);
282                         }
283                         if (swapNameProgram) {
284                             (*swapNameProgram) (rx_listenerPid, &name, 0);
285                             rx_listenerPid = 0;
286                         }
287                         return;
288                     }
289                 }
290             }
291 #else
292             for (socket = rx_minSocketNumber;
293                  p && socket <= rx_maxSocketNumber; socket++) {
294                 if (!FD_ISSET(socket, rfds))
295                     continue;
296                 if (rxi_ReadPacket(socket, p, &host, &port)) {
297                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
298                                           newcallp);
299                     if (newcallp && *newcallp) {
300                         if (p) {
301                             rxi_FreePacket(p);
302                         }
303                         if (swapNameProgram) {
304                             (*swapNameProgram) (rx_listenerPid, &name, 0);
305                             rx_listenerPid = 0;
306                         }
307                         return;
308                     }
309                 }
310             }
311 #endif
312             break;
313         }
314     }
315     /* NOTREACHED */
316 }
317
318 /* This is the listener process request loop. The listener process loop
319  * becomes a server thread when rxi_ListenerProc returns, and stays
320  * server thread until rxi_ServerProc returns. */
321 static void
322 rx_ListenerProc(void *dummy)
323 {
324     int threadID;
325     int sock;
326     struct rx_call *newcall;
327     fd_set *rfds;
328
329     if (!(rfds = IOMGR_AllocFDSet())) {
330         osi_Panic("rx_ListenerProc: no fd_sets!\n");
331     }
332
333     while (1) {
334         newcall = NULL;
335         threadID = -1;
336         rxi_ListenerProc(rfds, &threadID, &newcall);
337         /* assert(threadID != -1); */
338         /* assert(newcall != NULL); */
339         sock = OSI_NULLSOCKET;
340         rxi_ServerProc(threadID, newcall, &sock);
341         /* assert(sock != OSI_NULLSOCKET); */
342     }
343     /* not reached */
344 }
345
346 /* This is the server process request loop. The server process loop
347  * becomes a listener thread when rxi_ServerProc returns, and stays
348  * listener thread until rxi_ListenerProc returns. */
349 void
350 rx_ServerProc(void)
351 {
352     int sock;
353     int threadID;
354     struct rx_call *newcall = NULL;
355     fd_set *rfds;
356
357     if (!(rfds = IOMGR_AllocFDSet())) {
358         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
359     }
360
361     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
362     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
363     /* threadID is used for making decisions in GetCall.  Get it by bumping
364      * number of threads handling incoming calls */
365     threadID = rxi_availProcs++;
366
367     while (1) {
368         sock = OSI_NULLSOCKET;
369         rxi_ServerProc(threadID, newcall, &sock);
370         /* assert(sock != OSI_NULLSOCKET); */
371         newcall = NULL;
372         rxi_ListenerProc(rfds, &threadID, &newcall);
373         /* assert(threadID != -1); */
374         /* assert(newcall != NULL); */
375     }
376     /* not reached */
377 }
378
379 /*
380  * Called from GetUDPSocket.
381  * Called from a single thread at startup.
382  * Returns 0 on success; -1 on failure.
383  */
384 int
385 rxi_Listen(osi_socket sock)
386 {
387 #ifndef AFS_NT40_ENV
388     /*
389      * Put the socket into non-blocking mode so that rx_Listener
390      * can do a polling read before entering select
391      */
392 #ifndef AFS_DJGPP_ENV
393     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
394         perror("fcntl");
395         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
396         return -1;
397     }
398 #else
399     if (__djgpp_set_socket_blocking_mode(sock, 1) < 0) {
400         perror("__djgpp_set_socket_blocking_mode");
401         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
402         return -1;
403     }
404 #endif /* AFS_DJGPP_ENV */
405
406     if (sock > FD_SETSIZE - 1) {
407         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
408          FD_SETSIZE - 1);
409         return -1;
410     }
411 #endif
412
413     FD_SET(sock, &rx_selectMask);
414     if (sock > rx_maxSocketNumber)
415         rx_maxSocketNumber = sock;
416     if (sock < rx_minSocketNumber)
417         rx_minSocketNumber = sock;
418     return 0;
419 }
420
421 /*
422  * Recvmsg
423  */
424 int
425 rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
426 {
427     return recvmsg((int)socket, msg_p, flags);
428 }
429
430 /*
431  * Simulate a blocking sendmsg on the non-blocking socket.
432  * It's non blocking because it was set that way for recvmsg.
433  */
434 int
435 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
436 {
437     fd_set *sfds = (fd_set *) 0;
438     while (sendmsg(socket, msg_p, flags) == -1) {
439         int err;
440         rx_stats.sendSelects++;
441
442         if (!sfds) {
443             if (!(sfds = IOMGR_AllocFDSet())) {
444                 (osi_Msg "rx failed to alloc fd_set: ");
445                 perror("rx_sendmsg");
446                 return 3;
447             }
448             FD_SET(socket, sfds);
449         }
450 #ifdef AFS_NT40_ENV
451         if (errno)
452 #elif defined(AFS_LINUX22_ENV)
453         /* linux unfortunately returns ECONNREFUSED if the target port
454          * is no longer in use */
455         /* and EAGAIN if a UDP checksum is incorrect */
456         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
457             && errno != EAGAIN)
458 #else
459         if (errno != EWOULDBLOCK && errno != ENOBUFS)
460 #endif
461         {
462             (osi_Msg "rx failed to send packet: ");
463             perror("rx_sendmsg");
464             return 3;
465         }
466         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
467             if (err >= 0 || errno != EINTR)
468                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
469         }
470     }
471     if (sfds)
472         IOMGR_FreeFDSet(sfds);
473     return 0;
474 }