fd15de580e9ce353b3034c4a9595c9ca33001403
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24
25 # include <sys/types.h>         /* fd_set on older platforms */
26 # include <errno.h>
27 # include <signal.h>
28 #ifdef AFS_NT40_ENV
29 # include <winsock2.h>
30 #else
31 # include <unistd.h>            /* select() prototype */
32 # include <sys/time.h>          /* struct timeval, select() prototype */
33 # ifndef FD_SET
34 #  include <sys/select.h>       /* fd_set on newer platforms */
35 # endif
36 # include <sys/socket.h>
37 # include <sys/file.h>
38 # include <netdb.h>
39 # include <sys/stat.h>
40 # include <netinet/in.h>
41 # include <net/if.h>
42 # include <sys/ioctl.h>
43 # include <sys/time.h>
44 #endif
45 # include <assert.h>
46 # include "rx.h"
47 # include "rx_globals.h"
48 # include <lwp.h>
49
50 #define MAXTHREADNAMELENGTH 64
51
52 int debugSelectFailure;         /* # of times select failed */
53
54 /*
55  * Sleep on the unique wait channel provided.
56  */
57 void
58 rxi_Sleep(void *addr)
59 {
60     LWP_WaitProcess(addr);
61 }
62
63 /*
64  * Wakeup any threads on the channel provided.
65  * They may be woken up spuriously, and must check any conditions.
66  */
67 void
68 rxi_Wakeup(void *addr)
69 {
70     LWP_NoYieldSignal(addr);
71 }
72
73 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
74 static void* rx_ListenerProc(void *dummy);
75
76 /*
77  * Delay the current thread the specified number of seconds.
78  */
79 void
80 rxi_Delay(int sec)
81 {
82     IOMGR_Sleep(sec);
83 }
84
85 static int quitListening = 0;
86
87 /* This routine will kill the listener thread, if it exists. */
88 void
89 rxi_StopListener(void)
90 {
91     quitListening = 1;
92     rxi_ReScheduleEvents();
93 }
94
95 /* This routine will get called by the event package whenever a new,
96    earlier than others, event is posted.  If the Listener process
97    is blocked in selects, this will unblock it.  It also can be called
98    to force a new trip through the rxi_Listener select loop when the set
99    of file descriptors it should be listening to changes... */
100 void
101 rxi_ReScheduleEvents(void)
102 {
103     if (rx_listenerPid)
104         IOMGR_Cancel(rx_listenerPid);
105 }
106
107 void
108 rxi_InitializeThreadSupport(void)
109 {
110     PROCESS junk;
111
112     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
113     IOMGR_Initialize();
114     FD_ZERO(&rx_selectMask);
115 }
116
117 void
118 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
119 {
120     PROCESS scratchPid;
121     static int number = 0;
122     char name[32];
123
124     sprintf(name, "srv_%d", ++number);
125     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
126                       "rx_ServerProc", &scratchPid);
127     if (registerProgram)
128         (*registerProgram) (scratchPid, name);
129 }
130
131 void
132 rxi_StartListener(void)
133 {
134     /* Priority of listener should be high, so it can keep conns alive */
135 #define RX_LIST_STACK   24000
136     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
137                       NULL, "rx_Listener", &rx_listenerPid);
138     if (registerProgram)
139         (*registerProgram) (rx_listenerPid, "listener");
140 }
141
142 /* The main loop which listens to the net for datagrams, and handles timeouts
143    and retransmissions, etc.  It also is responsible for scheduling the
144    execution of pending events (in conjunction with event.c).
145
146    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
147    keeping up with the incoming stream of packets (because there are threads that
148    are interfering with its running sufficiently often), rx does a polling select
149    before doing a real IOMGR_Select system call.  Doing a real select means that
150    we don't have to let other processes run before processing more packets.
151
152    So, our algorithm is that if the last poll on the file descriptor found useful data, or
153    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
154    then we try the polling select before the IOMGR_Select.  If we eventually catch up
155    (which we can tell by the polling select returning no input packets ready), then we
156    don't do a polling select again until several seconds later (via nextPollTime mechanism).
157    */
158
159 static void
160 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
161 {
162     afs_uint32 host;
163     u_short port;
164     struct rx_packet *p = (struct rx_packet *)0;
165     osi_socket socket;
166     struct clock cv;
167     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
168     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
169     struct timeval tv, *tvp;
170     int code;
171 #ifdef AFS_NT40_ENV
172     int i;
173 #endif
174     PROCESS pid;
175     char name[MAXTHREADNAMELENGTH] = "srv_0";
176
177     clock_NewTime();
178     lastPollWorked = 0;
179     nextPollTime = 0;
180     code = LWP_CurrentProcess(&pid);
181     if (code) {
182         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
183         assert(0);
184     }
185     rx_listenerPid = pid;
186     if (swapNameProgram)
187         (*swapNameProgram) (pid, "listener", &name[0]);
188
189     for (;;) {
190         /* See if a check for additional packets was issued */
191         rx_CheckPackets();
192
193         /* Grab a new packet only if necessary (otherwise re-use the old one) */
194         if (p) {
195             rxi_RestoreDataBufs(p);
196         } else {
197             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
198                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
199         }
200         /* Wait for the next event time or a packet to arrive. */
201         /* event_RaiseEvents schedules any events whose time has come and
202          * then atomically computes the time to the next event, guaranteeing
203          * that this is positive.  If there is no next event, it returns 0 */
204         clock_NewTime();
205         if (!rxevent_RaiseEvents(&cv))
206             tvp = NULL;
207         else {
208             /* It's important to copy cv to tv, because the 4.3 documentation
209              * for select threatens that *tv may be updated after a select, in
210              * future editions of the system, to indicate how much of the time
211              * period has elapsed.  So we shouldn't rely on tv not being altered. */
212             tv.tv_sec = cv.sec; /* Time to next event */
213             tv.tv_usec = cv.usec;
214             tvp = &tv;
215         }
216         rx_stats.selects++;
217
218         *rfds = rx_selectMask;
219
220         if (lastPollWorked || nextPollTime < clock_Sec()) {
221             /* we're catching up, or haven't tried to for a few seconds */
222             doingPoll = 1;
223             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
224             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
225             tvp = &tv;
226             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
227         } else {
228             doingPoll = 0;
229             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
230         }
231         lastPollWorked = 0;     /* default is that it didn't find anything */
232
233         if (quitListening) {
234             quitListening = 0;
235             LWP_DestroyProcess(pid);
236         }
237
238         switch (code) {
239         case 0:
240             /* Timer interrupt:
241              * If it was a timer interrupt then we can assume that
242              * the time has advanced by roughly the value of the
243              * previous timeout, and that there is now at least
244              * one pending event.
245              */
246             clock_NewTime();
247             break;
248         case -1:
249             /* select or IOMGR_Select returned failure */
250             debugSelectFailure++;       /* update debugging counter */
251             clock_NewTime();
252             break;
253         case -2:
254             /* IOMGR_Cancel:
255              * IOMGR_Cancel is invoked whenever a new event is
256              * posted that is earlier than any existing events.
257              * So we re-evaluate the time, and then go back to
258              * reschedule events
259              */
260             clock_NewTime();
261             break;
262
263         default:
264             /* Packets have arrived, presumably:
265              * If it wasn't a timer interrupt, then no event should have
266              * timed out yet (well some event may have, but only just...), so
267              * we don't bother looking to see if any have timed out, but just
268              * go directly to reading the data packets
269              */
270             clock_NewTime();
271             if (doingPoll)
272                 lastPollWorked = 1;
273 #ifdef AFS_NT40_ENV
274             for (i = 0; p && i < rfds->fd_count; i++) {
275                 socket = rfds->fd_array[i];
276                 if (rxi_ReadPacket(socket, p, &host, &port)) {
277                     *newcallp = NULL;
278                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
279                                           newcallp);
280                     if (newcallp && *newcallp) {
281                         if (p) {
282                             rxi_FreePacket(p);
283                         }
284                         if (swapNameProgram) {
285                             (*swapNameProgram) (rx_listenerPid, name, 0);
286                             rx_listenerPid = 0;
287                         }
288                         return;
289                     }
290                 }
291             }
292 #else
293             for (socket = rx_minSocketNumber;
294                  p && socket <= rx_maxSocketNumber; socket++) {
295                 if (!FD_ISSET(socket, rfds))
296                     continue;
297                 if (rxi_ReadPacket(socket, p, &host, &port)) {
298                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
299                                           newcallp);
300                     if (newcallp && *newcallp) {
301                         if (p) {
302                             rxi_FreePacket(p);
303                         }
304                         if (swapNameProgram) {
305                             (*swapNameProgram) (rx_listenerPid, name, 0);
306                             rx_listenerPid = 0;
307                         }
308                         return;
309                     }
310                 }
311             }
312 #endif
313             break;
314         }
315     }
316     /* NOTREACHED */
317 }
318
319 /* This is the listener process request loop. The listener process loop
320  * becomes a server thread when rxi_ListenerProc returns, and stays
321  * server thread until rxi_ServerProc returns. */
322 static void *
323 rx_ListenerProc(void *dummy)
324 {
325     int threadID;
326     osi_socket sock;
327     struct rx_call *newcall;
328     fd_set *rfds;
329
330     if (!(rfds = IOMGR_AllocFDSet())) {
331         osi_Panic("rx_ListenerProc: no fd_sets!\n");
332     }
333
334     while (1) {
335         newcall = NULL;
336         threadID = -1;
337         rxi_ListenerProc(rfds, &threadID, &newcall);
338         /* assert(threadID != -1); */
339         /* assert(newcall != NULL); */
340         sock = OSI_NULLSOCKET;
341         rxi_ServerProc(threadID, newcall, &sock);
342         /* assert(sock != OSI_NULLSOCKET); */
343     }
344     /* not reached */
345     return NULL;
346 }
347
348 /* This is the server process request loop. The server process loop
349  * becomes a listener thread when rxi_ServerProc returns, and stays
350  * listener thread until rxi_ListenerProc returns. */
351 void *
352 rx_ServerProc(void * unused)
353 {
354     osi_socket sock;
355     int threadID;
356     struct rx_call *newcall = NULL;
357     fd_set *rfds;
358
359     if (!(rfds = IOMGR_AllocFDSet())) {
360         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
361     }
362
363     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
364     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
365     /* threadID is used for making decisions in GetCall.  Get it by bumping
366      * number of threads handling incoming calls */
367     threadID = rxi_availProcs++;
368
369     while (1) {
370         sock = OSI_NULLSOCKET;
371         rxi_ServerProc(threadID, newcall, &sock);
372         /* assert(sock != OSI_NULLSOCKET); */
373         newcall = NULL;
374         rxi_ListenerProc(rfds, &threadID, &newcall);
375         /* assert(threadID != -1); */
376         /* assert(newcall != NULL); */
377     }
378     /* not reached */
379     return NULL;
380 }
381
382 /*
383  * Called from GetUDPSocket.
384  * Called from a single thread at startup.
385  * Returns 0 on success; -1 on failure.
386  */
387 int
388 rxi_Listen(osi_socket sock)
389 {
390 #ifndef AFS_NT40_ENV
391     /*
392      * Put the socket into non-blocking mode so that rx_Listener
393      * can do a polling read before entering select
394      */
395     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
396         perror("fcntl");
397         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
398         return -1;
399     }
400
401     if (sock > FD_SETSIZE - 1) {
402         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
403          FD_SETSIZE - 1);
404         return -1;
405     }
406 #endif
407
408     FD_SET(sock, &rx_selectMask);
409     if (sock > rx_maxSocketNumber)
410         rx_maxSocketNumber = sock;
411     if (sock < rx_minSocketNumber)
412         rx_minSocketNumber = sock;
413     return 0;
414 }
415
416 /*
417  * Recvmsg
418  */
419 int
420 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
421 {
422 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
423     while((rxi_HandleSocketError(socket)) > 0)
424         ;
425 #endif
426     return recvmsg(socket, msg_p, flags);
427 }
428
429 /*
430  * Simulate a blocking sendmsg on the non-blocking socket.
431  * It's non blocking because it was set that way for recvmsg.
432  */
433 int
434 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
435 {
436     fd_set *sfds = (fd_set *) 0;
437     while (sendmsg(socket, msg_p, flags) == -1) {
438         int err;
439         rx_stats.sendSelects++;
440
441         if (!sfds) {
442             if (!(sfds = IOMGR_AllocFDSet())) {
443                 (osi_Msg "rx failed to alloc fd_set: ");
444                 perror("rx_sendmsg");
445                 return -1;
446             }
447             FD_SET(socket, sfds);
448         }
449 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
450         while((rxi_HandleSocketError(socket)) > 0)
451           ;
452 #endif
453 #ifdef AFS_NT40_ENV
454         if (WSAGetLastError())
455 #elif defined(AFS_LINUX22_ENV)
456         /* linux unfortunately returns ECONNREFUSED if the target port
457          * is no longer in use */
458         /* and EAGAIN if a UDP checksum is incorrect */
459         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
460             && errno != EAGAIN)
461 #else
462         if (errno != EWOULDBLOCK && errno != ENOBUFS)
463 #endif
464         {
465             (osi_Msg "rx failed to send packet: ");
466             perror("rx_sendmsg");
467 #ifndef AFS_NT40_ENV
468             if (errno > 0)
469               return -errno;
470 #else
471             if (WSAGetLastError() > 0)
472               return -WSAGetLastError();
473 #endif
474             return -1;
475         }
476         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
477             if (err >= 0 || errno != EINTR)
478                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
479         }
480     }
481     if (sfds)
482         IOMGR_FreeFDSet(sfds);
483     return 0;
484 }