acf267770b7cc8c397f3a923cc6a8b3611fa9735
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #ifndef KERNEL
25 # include <roken.h>
26 #endif /* KERNEL */
27
28 #include <sys/types.h>          /* fd_set on older platforms */
29 #include <errno.h>
30 #include <signal.h>
31 #ifdef AFS_NT40_ENV
32 # include <winsock2.h>
33 #else
34 # include <unistd.h>            /* select() prototype */
35 # include <sys/time.h>          /* struct timeval, select() prototype */
36 # ifndef FD_SET
37 #  include <sys/select.h>       /* fd_set on newer platforms */
38 # endif
39 # include <sys/socket.h>
40 # include <sys/file.h>
41 # include <netdb.h>
42 # include <sys/stat.h>
43 # include <netinet/in.h>
44 # include <net/if.h>
45 # include <sys/ioctl.h>
46 # include <sys/time.h>
47 #endif
48
49 #include "rx.h"
50 #include "rx_atomic.h"
51 #include "rx_globals.h"
52 #include "rx_stats.h"
53 #include <lwp.h>
54
55 #define MAXTHREADNAMELENGTH 64
56
57 int debugSelectFailure;         /* # of times select failed */
58
59 /*
60  * Sleep on the unique wait channel provided.
61  */
62 void
63 rxi_Sleep(void *addr)
64 {
65     LWP_WaitProcess(addr);
66 }
67
68 /*
69  * Wakeup any threads on the channel provided.
70  * They may be woken up spuriously, and must check any conditions.
71  */
72 void
73 rxi_Wakeup(void *addr)
74 {
75     LWP_NoYieldSignal(addr);
76 }
77
78 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
79 static void* rx_ListenerProc(void *dummy);
80
81 /*
82  * Delay the current thread the specified number of seconds.
83  */
84 void
85 rxi_Delay(int sec)
86 {
87     IOMGR_Sleep(sec);
88 }
89
90 static int quitListening = 0;
91
92 /* This routine will kill the listener thread, if it exists. */
93 void
94 rxi_StopListener(void)
95 {
96     quitListening = 1;
97     rxi_ReScheduleEvents();
98 }
99
100 /* This routine will get called by the event package whenever a new,
101    earlier than others, event is posted.  If the Listener process
102    is blocked in selects, this will unblock it.  It also can be called
103    to force a new trip through the rxi_Listener select loop when the set
104    of file descriptors it should be listening to changes... */
105 void
106 rxi_ReScheduleEvents(void)
107 {
108     if (rx_listenerPid)
109         IOMGR_Cancel(rx_listenerPid);
110 }
111
112 void
113 rxi_InitializeThreadSupport(void)
114 {
115     PROCESS junk;
116
117     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
118     IOMGR_Initialize();
119     FD_ZERO(&rx_selectMask);
120 }
121
122 void
123 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
124 {
125     PROCESS scratchPid;
126     static int number = 0;
127     char name[32];
128
129     sprintf(name, "srv_%d", ++number);
130     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
131                       "rx_ServerProc", &scratchPid);
132     if (registerProgram)
133         (*registerProgram) (scratchPid, name);
134 }
135
136 void
137 rxi_StartListener(void)
138 {
139     /* Priority of listener should be high, so it can keep conns alive */
140 #define RX_LIST_STACK   24000
141     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
142                       NULL, "rx_Listener", &rx_listenerPid);
143     if (registerProgram)
144         (*registerProgram) (rx_listenerPid, "listener");
145 }
146
147 /* The main loop which listens to the net for datagrams, and handles timeouts
148    and retransmissions, etc.  It also is responsible for scheduling the
149    execution of pending events (in conjunction with event.c).
150
151    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
152    keeping up with the incoming stream of packets (because there are threads that
153    are interfering with its running sufficiently often), rx does a polling select
154    before doing a real IOMGR_Select system call.  Doing a real select means that
155    we don't have to let other processes run before processing more packets.
156
157    So, our algorithm is that if the last poll on the file descriptor found useful data, or
158    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
159    then we try the polling select before the IOMGR_Select.  If we eventually catch up
160    (which we can tell by the polling select returning no input packets ready), then we
161    don't do a polling select again until several seconds later (via nextPollTime mechanism).
162    */
163
164 static void
165 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
166 {
167     afs_uint32 host;
168     u_short port;
169     struct rx_packet *p = (struct rx_packet *)0;
170     osi_socket socket;
171     struct clock cv;
172     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
173     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
174     struct timeval tv, *tvp;
175     int code;
176 #ifdef AFS_NT40_ENV
177     int i;
178 #endif
179     PROCESS pid;
180     char name[MAXTHREADNAMELENGTH] = "srv_0";
181
182     clock_NewTime();
183     lastPollWorked = 0;
184     nextPollTime = 0;
185     code = LWP_CurrentProcess(&pid);
186     if (code) {
187         osi_Panic("rxi_Listener: Can't get my pid.\n");
188     }
189     rx_listenerPid = pid;
190     if (swapNameProgram)
191         (*swapNameProgram) (pid, "listener", &name[0]);
192
193     for (;;) {
194         /* See if a check for additional packets was issued */
195         rx_CheckPackets();
196
197         /* Grab a new packet only if necessary (otherwise re-use the old one) */
198         if (p) {
199             rxi_RestoreDataBufs(p);
200         } else {
201             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
202                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
203         }
204         /* Wait for the next event time or a packet to arrive. */
205         /* event_RaiseEvents schedules any events whose time has come and
206          * then atomically computes the time to the next event, guaranteeing
207          * that this is positive.  If there is no next event, it returns 0 */
208         clock_NewTime();
209         if (!rxevent_RaiseEvents(&cv))
210             tvp = NULL;
211         else {
212             /* It's important to copy cv to tv, because the 4.3 documentation
213              * for select threatens that *tv may be updated after a select, in
214              * future editions of the system, to indicate how much of the time
215              * period has elapsed.  So we shouldn't rely on tv not being altered. */
216             tv.tv_sec = cv.sec; /* Time to next event */
217             tv.tv_usec = cv.usec;
218             tvp = &tv;
219         }
220         if (rx_stats_active)
221             rx_atomic_inc(&rx_stats.selects);
222
223         *rfds = rx_selectMask;
224
225         if (lastPollWorked || nextPollTime < clock_Sec()) {
226             /* we're catching up, or haven't tried to for a few seconds */
227             doingPoll = 1;
228             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
229             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
230             tvp = &tv;
231             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
232         } else {
233             doingPoll = 0;
234             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
235         }
236         lastPollWorked = 0;     /* default is that it didn't find anything */
237
238         if (quitListening) {
239             quitListening = 0;
240             LWP_DestroyProcess(pid);
241         }
242
243         switch (code) {
244         case 0:
245             /* Timer interrupt:
246              * If it was a timer interrupt then we can assume that
247              * the time has advanced by roughly the value of the
248              * previous timeout, and that there is now at least
249              * one pending event.
250              */
251             clock_NewTime();
252             break;
253         case -1:
254             /* select or IOMGR_Select returned failure */
255             debugSelectFailure++;       /* update debugging counter */
256             clock_NewTime();
257             break;
258         case -2:
259             /* IOMGR_Cancel:
260              * IOMGR_Cancel is invoked whenever a new event is
261              * posted that is earlier than any existing events.
262              * So we re-evaluate the time, and then go back to
263              * reschedule events
264              */
265             clock_NewTime();
266             break;
267
268         default:
269             /* Packets have arrived, presumably:
270              * If it wasn't a timer interrupt, then no event should have
271              * timed out yet (well some event may have, but only just...), so
272              * we don't bother looking to see if any have timed out, but just
273              * go directly to reading the data packets
274              */
275             clock_NewTime();
276             if (doingPoll)
277                 lastPollWorked = 1;
278 #ifdef AFS_NT40_ENV
279             for (i = 0; p && i < rfds->fd_count; i++) {
280                 socket = rfds->fd_array[i];
281                 if (rxi_ReadPacket(socket, p, &host, &port)) {
282                     *newcallp = NULL;
283                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
284                                           newcallp);
285                     if (newcallp && *newcallp) {
286                         if (p) {
287                             rxi_FreePacket(p);
288                         }
289                         if (swapNameProgram) {
290                             (*swapNameProgram) (rx_listenerPid, name, 0);
291                             rx_listenerPid = 0;
292                         }
293                         return;
294                     }
295                 }
296             }
297 #else
298             for (socket = rx_minSocketNumber;
299                  p && socket <= rx_maxSocketNumber; socket++) {
300                 if (!FD_ISSET(socket, rfds))
301                     continue;
302                 if (rxi_ReadPacket(socket, p, &host, &port)) {
303                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
304                                           newcallp);
305                     if (newcallp && *newcallp) {
306                         if (p) {
307                             rxi_FreePacket(p);
308                         }
309                         if (swapNameProgram) {
310                             (*swapNameProgram) (rx_listenerPid, name, 0);
311                             rx_listenerPid = 0;
312                         }
313                         return;
314                     }
315                 }
316             }
317 #endif
318             break;
319         }
320     }
321     /* NOTREACHED */
322 }
323
324 /* This is the listener process request loop. The listener process loop
325  * becomes a server thread when rxi_ListenerProc returns, and stays
326  * server thread until rxi_ServerProc returns. */
327 static void *
328 rx_ListenerProc(void *dummy)
329 {
330     int threadID;
331     osi_socket sock;
332     struct rx_call *newcall;
333     fd_set *rfds;
334
335     if (!(rfds = IOMGR_AllocFDSet())) {
336         osi_Panic("rx_ListenerProc: no fd_sets!\n");
337     }
338
339     while (1) {
340         newcall = NULL;
341         threadID = -1;
342         rxi_ListenerProc(rfds, &threadID, &newcall);
343         /* osi_Assert(threadID != -1); */
344         /* osi_Assert(newcall != NULL); */
345         sock = OSI_NULLSOCKET;
346         rxi_ServerProc(threadID, newcall, &sock);
347         /* osi_Assert(sock != OSI_NULLSOCKET); */
348     }
349     /* not reached */
350     return NULL;
351 }
352
353 /* This is the server process request loop. The server process loop
354  * becomes a listener thread when rxi_ServerProc returns, and stays
355  * listener thread until rxi_ListenerProc returns. */
356 void *
357 rx_ServerProc(void * unused)
358 {
359     osi_socket sock;
360     int threadID;
361     struct rx_call *newcall = NULL;
362     fd_set *rfds;
363
364     if (!(rfds = IOMGR_AllocFDSet())) {
365         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
366     }
367
368     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
369     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
370     /* threadID is used for making decisions in GetCall.  Get it by bumping
371      * number of threads handling incoming calls */
372     threadID = rxi_availProcs++;
373
374     while (1) {
375         sock = OSI_NULLSOCKET;
376         rxi_ServerProc(threadID, newcall, &sock);
377         /* osi_Assert(sock != OSI_NULLSOCKET); */
378         newcall = NULL;
379         rxi_ListenerProc(rfds, &threadID, &newcall);
380         /* osi_Assert(threadID != -1); */
381         /* osi_Assert(newcall != NULL); */
382     }
383     /* not reached */
384     return NULL;
385 }
386
387 /*
388  * Called from GetUDPSocket.
389  * Called from a single thread at startup.
390  * Returns 0 on success; -1 on failure.
391  */
392 int
393 rxi_Listen(osi_socket sock)
394 {
395 #ifndef AFS_NT40_ENV
396     /*
397      * Put the socket into non-blocking mode so that rx_Listener
398      * can do a polling read before entering select
399      */
400     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
401         perror("fcntl");
402         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
403         return -1;
404     }
405
406     if (sock > FD_SETSIZE - 1) {
407         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
408          FD_SETSIZE - 1);
409         return -1;
410     }
411 #endif
412
413     FD_SET(sock, &rx_selectMask);
414     if (sock > rx_maxSocketNumber)
415         rx_maxSocketNumber = sock;
416     if (sock < rx_minSocketNumber)
417         rx_minSocketNumber = sock;
418     return 0;
419 }
420
421 /*
422  * Recvmsg
423  */
424 int
425 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
426 {
427 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
428     while((rxi_HandleSocketError(socket)) > 0)
429         ;
430 #endif
431     return recvmsg(socket, msg_p, flags);
432 }
433
434 /*
435  * Simulate a blocking sendmsg on the non-blocking socket.
436  * It's non blocking because it was set that way for recvmsg.
437  */
438 int
439 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
440 {
441     fd_set *sfds = (fd_set *) 0;
442     while (sendmsg(socket, msg_p, flags) == -1) {
443         int err;
444         if (rx_stats_active)
445             rx_atomic_inc(&rx_stats.sendSelects);
446
447         if (!sfds) {
448             if (!(sfds = IOMGR_AllocFDSet())) {
449                 (osi_Msg "rx failed to alloc fd_set: ");
450                 perror("rx_sendmsg");
451                 return -1;
452             }
453             FD_SET(socket, sfds);
454         }
455 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
456         while((rxi_HandleSocketError(socket)) > 0)
457           ;
458 #endif
459 #ifdef AFS_NT40_ENV
460         if (WSAGetLastError())
461 #elif defined(AFS_LINUX22_ENV)
462         /* linux unfortunately returns ECONNREFUSED if the target port
463          * is no longer in use */
464         /* and EAGAIN if a UDP checksum is incorrect */
465         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
466             && errno != EAGAIN)
467 #else
468         if (errno != EWOULDBLOCK && errno != ENOBUFS)
469 #endif
470         {
471             (osi_Msg "rx failed to send packet: ");
472             perror("rx_sendmsg");
473 #ifndef AFS_NT40_ENV
474             if (errno > 0)
475               return -errno;
476 #else
477             if (WSAGetLastError() > 0)
478               return -WSAGetLastError();
479 #endif
480             return -1;
481         }
482         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
483             if (err >= 0 || errno != EINTR)
484                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
485         }
486     }
487     if (sfds)
488         IOMGR_FreeFDSet(sfds);
489     return 0;
490 }