win32-fd_set-20040719
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 RCSID
25     ("$Header$");
26
27 # include <sys/types.h>         /* fd_set on older platforms */
28 # include <errno.h>
29 # include <signal.h>
30 #ifdef AFS_NT40_ENV
31 # include <winsock2.h>
32 #else
33 # include <unistd.h>            /* select() prototype */
34 # include <sys/time.h>          /* struct timeval, select() prototype */
35 # ifndef FD_SET
36 #  include <sys/select.h>       /* fd_set on newer platforms */
37 # endif
38 # include <sys/socket.h>
39 # include <sys/file.h>
40 # include <netdb.h>
41 # include <sys/stat.h>
42 # include <netinet/in.h>
43 # include <net/if.h>
44 # include <sys/ioctl.h>
45 # include <sys/time.h>
46 #endif
47 # include "rx.h"
48 # include "rx_globals.h"
49 # include <lwp.h>
50
51 #define MAXTHREADNAMELENGTH 64
52
53 extern int (*registerProgram) ();
54 extern int (*swapNameProgram) ();
55
56 int debugSelectFailure;         /* # of times select failed */
57
58 /*
59  * Sleep on the unique wait channel provided.
60  */
61 void
62 rxi_Sleep(void *addr)
63 {
64     LWP_WaitProcess(addr);
65 }
66
67 /*
68  * Wakeup any threads on the channel provided.
69  * They may be woken up spuriously, and must check any conditions.
70  */
71 void
72 rxi_Wakeup(void *addr)
73 {
74     LWP_NoYieldSignal(addr);
75 }
76
77 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
78 static void rx_ListenerProc(void *dummy);
79
80 /*
81  * Delay the current thread the specified number of seconds.
82  */
83 void
84 rxi_Delay(int sec)
85 {
86     IOMGR_Sleep(sec);
87 }
88
89 static int quitListening = 0;
90
91 /* This routine will kill the listener thread, if it exists. */
92 void
93 rxi_StopListener(void)
94 {
95     quitListening = 1;
96     rxi_ReScheduleEvents();
97 }
98
99 /* This routine will get called by the event package whenever a new,
100    earlier than others, event is posted.  If the Listener process
101    is blocked in selects, this will unblock it.  It also can be called
102    to force a new trip through the rxi_Listener select loop when the set
103    of file descriptors it should be listening to changes... */
104 void
105 rxi_ReScheduleEvents(void)
106 {
107     if (rx_listenerPid)
108         IOMGR_Cancel(rx_listenerPid);
109 }
110
111 void
112 rxi_InitializeThreadSupport(void)
113 {
114     PROCESS junk;
115
116     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
117     IOMGR_Initialize();
118     FD_ZERO(&rx_selectMask);
119 }
120
121 void
122 rxi_StartServerProc(void (*proc) (void), int stacksize)
123 {
124     PROCESS scratchPid;
125     static int number = 0;
126     char name[32];
127
128     sprintf(name, "srv_%d", ++number);
129     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, (void *)0,
130                       "rx_ServerProc", &scratchPid);
131     if (registerProgram)
132         (*registerProgram) (scratchPid, name);
133 }
134
135 void
136 rxi_StartListener(void)
137 {
138     /* Priority of listener should be high, so it can keep conns alive */
139 #define RX_LIST_STACK   24000
140     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
141                       (void *)0, "rx_Listener", &rx_listenerPid);
142     if (registerProgram)
143         (*registerProgram) (rx_listenerPid, "listener");
144 }
145
146 /* The main loop which listens to the net for datagrams, and handles timeouts
147    and retransmissions, etc.  It also is responsible for scheduling the
148    execution of pending events (in conjunction with event.c).
149
150    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
151    keeping up with the incoming stream of packets (because there are threads that
152    are interfering with its running sufficiently often), rx does a polling select
153    before doing a real IOMGR_Select system call.  Doing a real select means that
154    we don't have to let other processes run before processing more packets.
155
156    So, our algorithm is that if the last poll on the file descriptor found useful data, or
157    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
158    then we try the polling select before the IOMGR_Select.  If we eventually catch up
159    (which we can tell by the polling select returning no input packets ready), then we
160    don't do a polling select again until several seconds later (via nextPollTime mechanism).
161    */
162
163 static void
164 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
165 {
166     afs_uint32 host;
167     u_short port;
168     register struct rx_packet *p = (struct rx_packet *)0;
169     int socket;
170     struct clock cv;
171     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
172     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
173     struct timeval tv, *tvp;
174     int code;
175 #ifdef AFS_NT40_ENV
176     int i;
177 #endif
178     PROCESS pid;
179     char name[MAXTHREADNAMELENGTH] = "srv_0";
180
181     clock_NewTime();
182     lastPollWorked = 0;
183     nextPollTime = 0;
184     code = LWP_CurrentProcess(&pid);
185     if (code) {
186         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
187         exit(1);
188     }
189     rx_listenerPid = pid;
190     if (swapNameProgram)
191         (*swapNameProgram) (pid, "listener", &name[0]);
192
193     for (;;) {
194         /* Grab a new packet only if necessary (otherwise re-use the old one) */
195         if (p) {
196             rxi_RestoreDataBufs(p);
197         } else {
198             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
199                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
200         }
201         /* Wait for the next event time or a packet to arrive. */
202         /* event_RaiseEvents schedules any events whose time has come and
203          * then atomically computes the time to the next event, guaranteeing
204          * that this is positive.  If there is no next event, it returns 0 */
205         clock_NewTime();
206         if (!rxevent_RaiseEvents(&cv))
207             tvp = NULL;
208         else {
209             /* It's important to copy cv to tv, because the 4.3 documentation
210              * for select threatens that *tv may be updated after a select, in
211              * future editions of the system, to indicate how much of the time
212              * period has elapsed.  So we shouldn't rely on tv not being altered. */
213             tv.tv_sec = cv.sec; /* Time to next event */
214             tv.tv_usec = cv.usec;
215             tvp = &tv;
216         }
217         rx_stats.selects++;
218
219         *rfds = rx_selectMask;
220
221         if (lastPollWorked || nextPollTime < clock_Sec()) {
222             /* we're catching up, or haven't tried to for a few seconds */
223             doingPoll = 1;
224             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
225             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
226             tvp = &tv;
227             code = select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
228         } else {
229             doingPoll = 0;
230             code = IOMGR_Select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
231         }
232         lastPollWorked = 0;     /* default is that it didn't find anything */
233
234         if (quitListening) {
235             quitListening = 0;
236             LWP_DestroyProcess(pid);
237         }
238
239         switch (code) {
240         case 0:
241             /* Timer interrupt:
242              * If it was a timer interrupt then we can assume that
243              * the time has advanced by roughly the value of the
244              * previous timeout, and that there is now at least
245              * one pending event.
246              */
247             clock_NewTime();
248             break;
249         case -1:
250             /* select or IOMGR_Select returned failure */
251             debugSelectFailure++;       /* update debugging counter */
252             clock_NewTime();
253             break;
254         case -2:
255             /* IOMGR_Cancel:
256              * IOMGR_Cancel is invoked whenever a new event is
257              * posted that is earlier than any existing events.
258              * So we re-evaluate the time, and then go back to
259              * reschedule events
260              */
261             clock_NewTime();
262             break;
263
264         default:
265             /* Packets have arrived, presumably:
266              * If it wasn't a timer interrupt, then no event should have
267              * timed out yet (well some event may have, but only just...), so
268              * we don't bother looking to see if any have timed out, but just
269              * go directly to reading the data packets
270              */
271             clock_NewTime();
272             if (doingPoll)
273                 lastPollWorked = 1;
274 #ifdef AFS_NT40_ENV
275             for (i = 0; p && i < rfds->fd_count; i++) {
276                 socket = rfds->fd_array[i];
277                 if (rxi_ReadPacket(socket, p, &host, &port)) {
278                     *newcallp = NULL;
279                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
280                                           newcallp);
281                     if (newcallp && *newcallp) {
282                         if (p) {
283                             rxi_FreePacket(p);
284                         }
285                         if (swapNameProgram) {
286                             (*swapNameProgram) (rx_listenerPid, &name, 0);
287                             rx_listenerPid = 0;
288                         }
289                         return;
290                     }
291                 }
292             }
293 #else
294             for (socket = rx_minSocketNumber;
295                  p && socket <= rx_maxSocketNumber; socket++) {
296                 if (!FD_ISSET(socket, rfds))
297                     continue;
298                 if (rxi_ReadPacket(socket, p, &host, &port)) {
299                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
300                                           newcallp);
301                     if (newcallp && *newcallp) {
302                         if (p) {
303                             rxi_FreePacket(p);
304                         }
305                         if (swapNameProgram) {
306                             (*swapNameProgram) (rx_listenerPid, &name, 0);
307                             rx_listenerPid = 0;
308                         }
309                         return;
310                     }
311                 }
312             }
313 #endif
314             break;
315         }
316     }
317     /* NOTREACHED */
318 }
319
320 /* This is the listener process request loop. The listener process loop
321  * becomes a server thread when rxi_ListenerProc returns, and stays
322  * server thread until rxi_ServerProc returns. */
323 static void
324 rx_ListenerProc(void *dummy)
325 {
326     int threadID;
327     int sock;
328     struct rx_call *newcall;
329     fd_set *rfds;
330
331     if (!(rfds = IOMGR_AllocFDSet())) {
332         osi_Panic("rx_ListenerProc: no fd_sets!\n");
333     }
334
335     while (1) {
336         newcall = NULL;
337         threadID = -1;
338         rxi_ListenerProc(rfds, &threadID, &newcall);
339         /* assert(threadID != -1); */
340         /* assert(newcall != NULL); */
341         sock = OSI_NULLSOCKET;
342         rxi_ServerProc(threadID, newcall, &sock);
343         /* assert(sock != OSI_NULLSOCKET); */
344     }
345     /* not reached */
346 }
347
348 /* This is the server process request loop. The server process loop
349  * becomes a listener thread when rxi_ServerProc returns, and stays
350  * listener thread until rxi_ListenerProc returns. */
351 void
352 rx_ServerProc(void)
353 {
354     int sock;
355     int threadID;
356     struct rx_call *newcall = NULL;
357     fd_set *rfds;
358
359     if (!(rfds = IOMGR_AllocFDSet())) {
360         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
361     }
362
363     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
364     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
365     /* threadID is used for making decisions in GetCall.  Get it by bumping
366      * number of threads handling incoming calls */
367     threadID = rxi_availProcs++;
368
369     while (1) {
370         sock = OSI_NULLSOCKET;
371         rxi_ServerProc(threadID, newcall, &sock);
372         /* assert(sock != OSI_NULLSOCKET); */
373         newcall = NULL;
374         rxi_ListenerProc(rfds, &threadID, &newcall);
375         /* assert(threadID != -1); */
376         /* assert(newcall != NULL); */
377     }
378     /* not reached */
379 }
380
381 /*
382  * Called from GetUDPSocket.
383  * Called from a single thread at startup.
384  * Returns 0 on success; -1 on failure.
385  */
386 int
387 rxi_Listen(osi_socket sock)
388 {
389 #ifndef AFS_NT40_ENV
390     /*
391      * Put the socket into non-blocking mode so that rx_Listener
392      * can do a polling read before entering select
393      */
394 #ifndef AFS_DJGPP_ENV
395     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
396         perror("fcntl");
397         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
398         return -1;
399     }
400 #else
401     if (__djgpp_set_socket_blocking_mode(sock, 1) < 0) {
402         perror("__djgpp_set_socket_blocking_mode");
403         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
404         return -1;
405     }
406 #endif /* AFS_DJGPP_ENV */
407
408     if (sock > FD_SETSIZE - 1) {
409         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
410          FD_SETSIZE - 1);
411         return -1;
412     }
413 #endif
414
415     FD_SET(sock, &rx_selectMask);
416     if (sock > rx_maxSocketNumber)
417         rx_maxSocketNumber = sock;
418     if (sock < rx_minSocketNumber)
419         rx_minSocketNumber = sock;
420     return 0;
421 }
422
423 /*
424  * Recvmsg
425  */
426 int
427 rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
428 {
429     return recvmsg((int)socket, msg_p, flags);
430 }
431
432 /*
433  * Simulate a blocking sendmsg on the non-blocking socket.
434  * It's non blocking because it was set that way for recvmsg.
435  */
436 int
437 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
438 {
439     fd_set *sfds = (fd_set *) 0;
440     while (sendmsg(socket, msg_p, flags) == -1) {
441         int err;
442         rx_stats.sendSelects++;
443
444         if (!sfds) {
445             if (!(sfds = IOMGR_AllocFDSet())) {
446                 (osi_Msg "rx failed to alloc fd_set: ");
447                 perror("rx_sendmsg");
448                 return 3;
449             }
450             FD_SET(socket, sfds);
451         }
452 #ifdef AFS_NT40_ENV
453         if (errno)
454 #elif defined(AFS_LINUX22_ENV)
455         /* linux unfortunately returns ECONNREFUSED if the target port
456          * is no longer in use */
457         /* and EAGAIN if a UDP checksum is incorrect */
458         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
459             && errno != EAGAIN)
460 #else
461         if (errno != EWOULDBLOCK && errno != ENOBUFS)
462 #endif
463         {
464             (osi_Msg "rx failed to send packet: ");
465             perror("rx_sendmsg");
466             return 3;
467         }
468         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
469             if (err >= 0 || errno != EINTR)
470                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
471         }
472     }
473     if (sfds)
474         IOMGR_FreeFDSet(sfds);
475     return 0;
476 }