rx: Reorganise includes
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24
25 #include <sys/types.h>          /* fd_set on older platforms */
26 #include <errno.h>
27 #include <signal.h>
28 #ifdef AFS_NT40_ENV
29 # include <winsock2.h>
30 #else
31 # include <unistd.h>            /* select() prototype */
32 # include <sys/time.h>          /* struct timeval, select() prototype */
33 # ifndef FD_SET
34 #  include <sys/select.h>       /* fd_set on newer platforms */
35 # endif
36 # include <sys/socket.h>
37 # include <sys/file.h>
38 # include <netdb.h>
39 # include <sys/stat.h>
40 # include <netinet/in.h>
41 # include <net/if.h>
42 # include <sys/ioctl.h>
43 # include <sys/time.h>
44 #endif
45
46 #include "rx.h"
47 #include "rx_atomic.h"
48 #include "rx_globals.h"
49 #include "rx_stats.h"
50 #include <lwp.h>
51
52 #define MAXTHREADNAMELENGTH 64
53
54 int debugSelectFailure;         /* # of times select failed */
55
56 /*
57  * Sleep on the unique wait channel provided.
58  */
59 void
60 rxi_Sleep(void *addr)
61 {
62     LWP_WaitProcess(addr);
63 }
64
65 /*
66  * Wakeup any threads on the channel provided.
67  * They may be woken up spuriously, and must check any conditions.
68  */
69 void
70 rxi_Wakeup(void *addr)
71 {
72     LWP_NoYieldSignal(addr);
73 }
74
75 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
76 static void* rx_ListenerProc(void *dummy);
77
78 /*
79  * Delay the current thread the specified number of seconds.
80  */
81 void
82 rxi_Delay(int sec)
83 {
84     IOMGR_Sleep(sec);
85 }
86
87 static int quitListening = 0;
88
89 /* This routine will kill the listener thread, if it exists. */
90 void
91 rxi_StopListener(void)
92 {
93     quitListening = 1;
94     rxi_ReScheduleEvents();
95 }
96
97 /* This routine will get called by the event package whenever a new,
98    earlier than others, event is posted.  If the Listener process
99    is blocked in selects, this will unblock it.  It also can be called
100    to force a new trip through the rxi_Listener select loop when the set
101    of file descriptors it should be listening to changes... */
102 void
103 rxi_ReScheduleEvents(void)
104 {
105     if (rx_listenerPid)
106         IOMGR_Cancel(rx_listenerPid);
107 }
108
109 void
110 rxi_InitializeThreadSupport(void)
111 {
112     PROCESS junk;
113
114     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
115     IOMGR_Initialize();
116     FD_ZERO(&rx_selectMask);
117 }
118
119 void
120 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
121 {
122     PROCESS scratchPid;
123     static int number = 0;
124     char name[32];
125
126     sprintf(name, "srv_%d", ++number);
127     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
128                       "rx_ServerProc", &scratchPid);
129     if (registerProgram)
130         (*registerProgram) (scratchPid, name);
131 }
132
133 void
134 rxi_StartListener(void)
135 {
136     /* Priority of listener should be high, so it can keep conns alive */
137 #define RX_LIST_STACK   24000
138     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
139                       NULL, "rx_Listener", &rx_listenerPid);
140     if (registerProgram)
141         (*registerProgram) (rx_listenerPid, "listener");
142 }
143
144 /* The main loop which listens to the net for datagrams, and handles timeouts
145    and retransmissions, etc.  It also is responsible for scheduling the
146    execution of pending events (in conjunction with event.c).
147
148    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
149    keeping up with the incoming stream of packets (because there are threads that
150    are interfering with its running sufficiently often), rx does a polling select
151    before doing a real IOMGR_Select system call.  Doing a real select means that
152    we don't have to let other processes run before processing more packets.
153
154    So, our algorithm is that if the last poll on the file descriptor found useful data, or
155    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
156    then we try the polling select before the IOMGR_Select.  If we eventually catch up
157    (which we can tell by the polling select returning no input packets ready), then we
158    don't do a polling select again until several seconds later (via nextPollTime mechanism).
159    */
160
161 static void
162 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
163 {
164     afs_uint32 host;
165     u_short port;
166     struct rx_packet *p = (struct rx_packet *)0;
167     osi_socket socket;
168     struct clock cv;
169     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
170     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
171     struct timeval tv, *tvp;
172     int code;
173 #ifdef AFS_NT40_ENV
174     int i;
175 #endif
176     PROCESS pid;
177     char name[MAXTHREADNAMELENGTH] = "srv_0";
178
179     clock_NewTime();
180     lastPollWorked = 0;
181     nextPollTime = 0;
182     code = LWP_CurrentProcess(&pid);
183     if (code) {
184         osi_Panic("rxi_Listener: Can't get my pid.\n");
185     }
186     rx_listenerPid = pid;
187     if (swapNameProgram)
188         (*swapNameProgram) (pid, "listener", &name[0]);
189
190     for (;;) {
191         /* See if a check for additional packets was issued */
192         rx_CheckPackets();
193
194         /* Grab a new packet only if necessary (otherwise re-use the old one) */
195         if (p) {
196             rxi_RestoreDataBufs(p);
197         } else {
198             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
199                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
200         }
201         /* Wait for the next event time or a packet to arrive. */
202         /* event_RaiseEvents schedules any events whose time has come and
203          * then atomically computes the time to the next event, guaranteeing
204          * that this is positive.  If there is no next event, it returns 0 */
205         clock_NewTime();
206         if (!rxevent_RaiseEvents(&cv))
207             tvp = NULL;
208         else {
209             /* It's important to copy cv to tv, because the 4.3 documentation
210              * for select threatens that *tv may be updated after a select, in
211              * future editions of the system, to indicate how much of the time
212              * period has elapsed.  So we shouldn't rely on tv not being altered. */
213             tv.tv_sec = cv.sec; /* Time to next event */
214             tv.tv_usec = cv.usec;
215             tvp = &tv;
216         }
217         if (rx_stats_active)
218             rx_atomic_inc(&rx_stats.selects);
219
220         *rfds = rx_selectMask;
221
222         if (lastPollWorked || nextPollTime < clock_Sec()) {
223             /* we're catching up, or haven't tried to for a few seconds */
224             doingPoll = 1;
225             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
226             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
227             tvp = &tv;
228             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
229         } else {
230             doingPoll = 0;
231             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
232         }
233         lastPollWorked = 0;     /* default is that it didn't find anything */
234
235         if (quitListening) {
236             quitListening = 0;
237             LWP_DestroyProcess(pid);
238         }
239
240         switch (code) {
241         case 0:
242             /* Timer interrupt:
243              * If it was a timer interrupt then we can assume that
244              * the time has advanced by roughly the value of the
245              * previous timeout, and that there is now at least
246              * one pending event.
247              */
248             clock_NewTime();
249             break;
250         case -1:
251             /* select or IOMGR_Select returned failure */
252             debugSelectFailure++;       /* update debugging counter */
253             clock_NewTime();
254             break;
255         case -2:
256             /* IOMGR_Cancel:
257              * IOMGR_Cancel is invoked whenever a new event is
258              * posted that is earlier than any existing events.
259              * So we re-evaluate the time, and then go back to
260              * reschedule events
261              */
262             clock_NewTime();
263             break;
264
265         default:
266             /* Packets have arrived, presumably:
267              * If it wasn't a timer interrupt, then no event should have
268              * timed out yet (well some event may have, but only just...), so
269              * we don't bother looking to see if any have timed out, but just
270              * go directly to reading the data packets
271              */
272             clock_NewTime();
273             if (doingPoll)
274                 lastPollWorked = 1;
275 #ifdef AFS_NT40_ENV
276             for (i = 0; p && i < rfds->fd_count; i++) {
277                 socket = rfds->fd_array[i];
278                 if (rxi_ReadPacket(socket, p, &host, &port)) {
279                     *newcallp = NULL;
280                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
281                                           newcallp);
282                     if (newcallp && *newcallp) {
283                         if (p) {
284                             rxi_FreePacket(p);
285                         }
286                         if (swapNameProgram) {
287                             (*swapNameProgram) (rx_listenerPid, name, 0);
288                             rx_listenerPid = 0;
289                         }
290                         return;
291                     }
292                 }
293             }
294 #else
295             for (socket = rx_minSocketNumber;
296                  p && socket <= rx_maxSocketNumber; socket++) {
297                 if (!FD_ISSET(socket, rfds))
298                     continue;
299                 if (rxi_ReadPacket(socket, p, &host, &port)) {
300                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
301                                           newcallp);
302                     if (newcallp && *newcallp) {
303                         if (p) {
304                             rxi_FreePacket(p);
305                         }
306                         if (swapNameProgram) {
307                             (*swapNameProgram) (rx_listenerPid, name, 0);
308                             rx_listenerPid = 0;
309                         }
310                         return;
311                     }
312                 }
313             }
314 #endif
315             break;
316         }
317     }
318     /* NOTREACHED */
319 }
320
321 /* This is the listener process request loop. The listener process loop
322  * becomes a server thread when rxi_ListenerProc returns, and stays
323  * server thread until rxi_ServerProc returns. */
324 static void *
325 rx_ListenerProc(void *dummy)
326 {
327     int threadID;
328     osi_socket sock;
329     struct rx_call *newcall;
330     fd_set *rfds;
331
332     if (!(rfds = IOMGR_AllocFDSet())) {
333         osi_Panic("rx_ListenerProc: no fd_sets!\n");
334     }
335
336     while (1) {
337         newcall = NULL;
338         threadID = -1;
339         rxi_ListenerProc(rfds, &threadID, &newcall);
340         /* osi_Assert(threadID != -1); */
341         /* osi_Assert(newcall != NULL); */
342         sock = OSI_NULLSOCKET;
343         rxi_ServerProc(threadID, newcall, &sock);
344         /* osi_Assert(sock != OSI_NULLSOCKET); */
345     }
346     /* not reached */
347     return NULL;
348 }
349
350 /* This is the server process request loop. The server process loop
351  * becomes a listener thread when rxi_ServerProc returns, and stays
352  * listener thread until rxi_ListenerProc returns. */
353 void *
354 rx_ServerProc(void * unused)
355 {
356     osi_socket sock;
357     int threadID;
358     struct rx_call *newcall = NULL;
359     fd_set *rfds;
360
361     if (!(rfds = IOMGR_AllocFDSet())) {
362         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
363     }
364
365     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
366     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
367     /* threadID is used for making decisions in GetCall.  Get it by bumping
368      * number of threads handling incoming calls */
369     threadID = rxi_availProcs++;
370
371     while (1) {
372         sock = OSI_NULLSOCKET;
373         rxi_ServerProc(threadID, newcall, &sock);
374         /* osi_Assert(sock != OSI_NULLSOCKET); */
375         newcall = NULL;
376         rxi_ListenerProc(rfds, &threadID, &newcall);
377         /* osi_Assert(threadID != -1); */
378         /* osi_Assert(newcall != NULL); */
379     }
380     /* not reached */
381     return NULL;
382 }
383
384 /*
385  * Called from GetUDPSocket.
386  * Called from a single thread at startup.
387  * Returns 0 on success; -1 on failure.
388  */
389 int
390 rxi_Listen(osi_socket sock)
391 {
392 #ifndef AFS_NT40_ENV
393     /*
394      * Put the socket into non-blocking mode so that rx_Listener
395      * can do a polling read before entering select
396      */
397     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
398         perror("fcntl");
399         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
400         return -1;
401     }
402
403     if (sock > FD_SETSIZE - 1) {
404         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
405          FD_SETSIZE - 1);
406         return -1;
407     }
408 #endif
409
410     FD_SET(sock, &rx_selectMask);
411     if (sock > rx_maxSocketNumber)
412         rx_maxSocketNumber = sock;
413     if (sock < rx_minSocketNumber)
414         rx_minSocketNumber = sock;
415     return 0;
416 }
417
418 /*
419  * Recvmsg
420  */
421 int
422 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
423 {
424 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
425     while((rxi_HandleSocketError(socket)) > 0)
426         ;
427 #endif
428     return recvmsg(socket, msg_p, flags);
429 }
430
431 /*
432  * Simulate a blocking sendmsg on the non-blocking socket.
433  * It's non blocking because it was set that way for recvmsg.
434  */
435 int
436 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
437 {
438     fd_set *sfds = (fd_set *) 0;
439     while (sendmsg(socket, msg_p, flags) == -1) {
440         int err;
441         if (rx_stats_active)
442             rx_atomic_inc(&rx_stats.sendSelects);
443
444         if (!sfds) {
445             if (!(sfds = IOMGR_AllocFDSet())) {
446                 (osi_Msg "rx failed to alloc fd_set: ");
447                 perror("rx_sendmsg");
448                 return -1;
449             }
450             FD_SET(socket, sfds);
451         }
452 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
453         while((rxi_HandleSocketError(socket)) > 0)
454           ;
455 #endif
456 #ifdef AFS_NT40_ENV
457         if (WSAGetLastError())
458 #elif defined(AFS_LINUX22_ENV)
459         /* linux unfortunately returns ECONNREFUSED if the target port
460          * is no longer in use */
461         /* and EAGAIN if a UDP checksum is incorrect */
462         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
463             && errno != EAGAIN)
464 #else
465         if (errno != EWOULDBLOCK && errno != ENOBUFS)
466 #endif
467         {
468             (osi_Msg "rx failed to send packet: ");
469             perror("rx_sendmsg");
470 #ifndef AFS_NT40_ENV
471             if (errno > 0)
472               return -errno;
473 #else
474             if (WSAGetLastError() > 0)
475               return -WSAGetLastError();
476 #endif
477             return -1;
478         }
479         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
480             if (err >= 0 || errno != EINTR)
481                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
482         }
483     }
484     if (sfds)
485         IOMGR_FreeFDSet(sfds);
486     return 0;
487 }