1de085731221aee9a1d6af53c314120778f129bc
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25
26 #include <lwp.h>
27
28 #include "rx.h"
29 #include "rx_atomic.h"
30 #include "rx_globals.h"
31 #include "rx_stats.h"
32
33 #define MAXTHREADNAMELENGTH 64
34
35 int debugSelectFailure;         /* # of times select failed */
36
37 /*
38  * Sleep on the unique wait channel provided.
39  */
40 void
41 rxi_Sleep(void *addr)
42 {
43     LWP_WaitProcess(addr);
44 }
45
46 /*
47  * Wakeup any threads on the channel provided.
48  * They may be woken up spuriously, and must check any conditions.
49  */
50 void
51 rxi_Wakeup(void *addr)
52 {
53     LWP_NoYieldSignal(addr);
54 }
55
56 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
57 static void* rx_ListenerProc(void *dummy);
58
59 /*
60  * Delay the current thread the specified number of seconds.
61  */
62 void
63 rxi_Delay(int sec)
64 {
65     IOMGR_Sleep(sec);
66 }
67
68 static int quitListening = 0;
69
70 /* This routine will kill the listener thread, if it exists. */
71 void
72 rxi_StopListener(void)
73 {
74     quitListening = 1;
75     rxi_ReScheduleEvents();
76 }
77
78 /* This routine will get called by the event package whenever a new,
79    earlier than others, event is posted.  If the Listener process
80    is blocked in selects, this will unblock it.  It also can be called
81    to force a new trip through the rxi_Listener select loop when the set
82    of file descriptors it should be listening to changes... */
83 void
84 rxi_ReScheduleEvents(void)
85 {
86     if (rx_listenerPid)
87         IOMGR_Cancel(rx_listenerPid);
88 }
89
90 void
91 rxi_InitializeThreadSupport(void)
92 {
93     PROCESS junk;
94
95     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
96     IOMGR_Initialize();
97     FD_ZERO(&rx_selectMask);
98 }
99
100 void
101 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
102 {
103     PROCESS scratchPid;
104     static int number = 0;
105     char name[32];
106
107     sprintf(name, "srv_%d", ++number);
108     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
109                       "rx_ServerProc", &scratchPid);
110     if (registerProgram)
111         (*registerProgram) (scratchPid, name);
112 }
113
114 void
115 rxi_StartListener(void)
116 {
117     /* Priority of listener should be high, so it can keep conns alive */
118 #define RX_LIST_STACK   24000
119     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
120                       NULL, "rx_Listener", &rx_listenerPid);
121     if (registerProgram)
122         (*registerProgram) (rx_listenerPid, "listener");
123 }
124
125 /* The main loop which listens to the net for datagrams, and handles timeouts
126    and retransmissions, etc.  It also is responsible for scheduling the
127    execution of pending events (in conjunction with event.c).
128
129    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
130    keeping up with the incoming stream of packets (because there are threads that
131    are interfering with its running sufficiently often), rx does a polling select
132    before doing a real IOMGR_Select system call.  Doing a real select means that
133    we don't have to let other processes run before processing more packets.
134
135    So, our algorithm is that if the last poll on the file descriptor found useful data, or
136    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
137    then we try the polling select before the IOMGR_Select.  If we eventually catch up
138    (which we can tell by the polling select returning no input packets ready), then we
139    don't do a polling select again until several seconds later (via nextPollTime mechanism).
140    */
141
142 static void
143 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
144 {
145     afs_uint32 host;
146     u_short port;
147     struct rx_packet *p = (struct rx_packet *)0;
148     osi_socket socket;
149     struct clock cv;
150     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
151     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
152     struct timeval tv, *tvp;
153     int code;
154 #ifdef AFS_NT40_ENV
155     int i;
156 #endif
157     PROCESS pid;
158     char name[MAXTHREADNAMELENGTH] = "srv_0";
159
160     clock_NewTime();
161     lastPollWorked = 0;
162     nextPollTime = 0;
163     code = LWP_CurrentProcess(&pid);
164     if (code) {
165         osi_Panic("rxi_Listener: Can't get my pid.\n");
166     }
167     rx_listenerPid = pid;
168     if (swapNameProgram)
169         (*swapNameProgram) (pid, "listener", &name[0]);
170
171     for (;;) {
172         /* See if a check for additional packets was issued */
173         rx_CheckPackets();
174
175         /* Grab a new packet only if necessary (otherwise re-use the old one) */
176         if (p) {
177             rxi_RestoreDataBufs(p);
178         } else {
179             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
180                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
181         }
182         /* Wait for the next event time or a packet to arrive. */
183         /* event_RaiseEvents schedules any events whose time has come and
184          * then atomically computes the time to the next event, guaranteeing
185          * that this is positive.  If there is no next event, it returns 0 */
186         clock_NewTime();
187         if (!rxevent_RaiseEvents(&cv))
188             tvp = NULL;
189         else {
190             /* It's important to copy cv to tv, because the 4.3 documentation
191              * for select threatens that *tv may be updated after a select, in
192              * future editions of the system, to indicate how much of the time
193              * period has elapsed.  So we shouldn't rely on tv not being altered. */
194             tv.tv_sec = cv.sec; /* Time to next event */
195             tv.tv_usec = cv.usec;
196             tvp = &tv;
197         }
198         if (rx_stats_active)
199             rx_atomic_inc(&rx_stats.selects);
200
201         *rfds = rx_selectMask;
202
203         if (lastPollWorked || nextPollTime < clock_Sec()) {
204             /* we're catching up, or haven't tried to for a few seconds */
205             doingPoll = 1;
206             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
207             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
208             tvp = &tv;
209             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
210         } else {
211             doingPoll = 0;
212             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
213         }
214         lastPollWorked = 0;     /* default is that it didn't find anything */
215
216         if (quitListening) {
217             quitListening = 0;
218             LWP_DestroyProcess(pid);
219         }
220
221         switch (code) {
222         case 0:
223             /* Timer interrupt:
224              * If it was a timer interrupt then we can assume that
225              * the time has advanced by roughly the value of the
226              * previous timeout, and that there is now at least
227              * one pending event.
228              */
229             clock_NewTime();
230             break;
231         case -1:
232             /* select or IOMGR_Select returned failure */
233             debugSelectFailure++;       /* update debugging counter */
234             clock_NewTime();
235             break;
236         case -2:
237             /* IOMGR_Cancel:
238              * IOMGR_Cancel is invoked whenever a new event is
239              * posted that is earlier than any existing events.
240              * So we re-evaluate the time, and then go back to
241              * reschedule events
242              */
243             clock_NewTime();
244             break;
245
246         default:
247             /* Packets have arrived, presumably:
248              * If it wasn't a timer interrupt, then no event should have
249              * timed out yet (well some event may have, but only just...), so
250              * we don't bother looking to see if any have timed out, but just
251              * go directly to reading the data packets
252              */
253             clock_NewTime();
254             if (doingPoll)
255                 lastPollWorked = 1;
256 #ifdef AFS_NT40_ENV
257             for (i = 0; p && i < rfds->fd_count; i++) {
258                 socket = rfds->fd_array[i];
259                 if (rxi_ReadPacket(socket, p, &host, &port)) {
260                     *newcallp = NULL;
261                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
262                                           newcallp);
263                     if (newcallp && *newcallp) {
264                         if (p) {
265                             rxi_FreePacket(p);
266                         }
267                         if (swapNameProgram) {
268                             (*swapNameProgram) (rx_listenerPid, name, 0);
269                             rx_listenerPid = 0;
270                         }
271                         return;
272                     }
273                 }
274             }
275 #else
276             for (socket = rx_minSocketNumber;
277                  p && socket <= rx_maxSocketNumber; socket++) {
278                 if (!FD_ISSET(socket, rfds))
279                     continue;
280                 if (rxi_ReadPacket(socket, p, &host, &port)) {
281                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
282                                           newcallp);
283                     if (newcallp && *newcallp) {
284                         if (p) {
285                             rxi_FreePacket(p);
286                         }
287                         if (swapNameProgram) {
288                             (*swapNameProgram) (rx_listenerPid, name, 0);
289                             rx_listenerPid = 0;
290                         }
291                         return;
292                     }
293                 }
294             }
295 #endif
296             break;
297         }
298     }
299     /* NOTREACHED */
300 }
301
302 /* This is the listener process request loop. The listener process loop
303  * becomes a server thread when rxi_ListenerProc returns, and stays
304  * server thread until rxi_ServerProc returns. */
305 static void *
306 rx_ListenerProc(void *dummy)
307 {
308     int threadID;
309     osi_socket sock;
310     struct rx_call *newcall;
311     fd_set *rfds;
312
313     if (!(rfds = IOMGR_AllocFDSet())) {
314         osi_Panic("rx_ListenerProc: no fd_sets!\n");
315     }
316
317     while (1) {
318         newcall = NULL;
319         threadID = -1;
320         rxi_ListenerProc(rfds, &threadID, &newcall);
321         /* osi_Assert(threadID != -1); */
322         /* osi_Assert(newcall != NULL); */
323         sock = OSI_NULLSOCKET;
324         rxi_ServerProc(threadID, newcall, &sock);
325         /* osi_Assert(sock != OSI_NULLSOCKET); */
326     }
327     /* not reached */
328     return NULL;
329 }
330
331 /* This is the server process request loop. The server process loop
332  * becomes a listener thread when rxi_ServerProc returns, and stays
333  * listener thread until rxi_ListenerProc returns. */
334 void *
335 rx_ServerProc(void * unused)
336 {
337     osi_socket sock;
338     int threadID;
339     struct rx_call *newcall = NULL;
340     fd_set *rfds;
341
342     if (!(rfds = IOMGR_AllocFDSet())) {
343         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
344     }
345
346     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
347     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
348     /* threadID is used for making decisions in GetCall.  Get it by bumping
349      * number of threads handling incoming calls */
350     threadID = rxi_availProcs++;
351
352     while (1) {
353         sock = OSI_NULLSOCKET;
354         rxi_ServerProc(threadID, newcall, &sock);
355         /* osi_Assert(sock != OSI_NULLSOCKET); */
356         newcall = NULL;
357         rxi_ListenerProc(rfds, &threadID, &newcall);
358         /* osi_Assert(threadID != -1); */
359         /* osi_Assert(newcall != NULL); */
360     }
361     /* not reached */
362     return NULL;
363 }
364
365 /*
366  * Called from GetUDPSocket.
367  * Called from a single thread at startup.
368  * Returns 0 on success; -1 on failure.
369  */
370 int
371 rxi_Listen(osi_socket sock)
372 {
373 #ifndef AFS_NT40_ENV
374     /*
375      * Put the socket into non-blocking mode so that rx_Listener
376      * can do a polling read before entering select
377      */
378     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
379         perror("fcntl");
380         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
381         return -1;
382     }
383
384     if (sock > FD_SETSIZE - 1) {
385         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
386          FD_SETSIZE - 1);
387         return -1;
388     }
389 #endif
390
391     FD_SET(sock, &rx_selectMask);
392     if (sock > rx_maxSocketNumber)
393         rx_maxSocketNumber = sock;
394     if (sock < rx_minSocketNumber)
395         rx_minSocketNumber = sock;
396     return 0;
397 }
398
399 /*
400  * Recvmsg
401  */
402 int
403 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
404 {
405 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
406     while((rxi_HandleSocketError(socket)) > 0)
407         ;
408 #endif
409     return recvmsg(socket, msg_p, flags);
410 }
411
412 /*
413  * Simulate a blocking sendmsg on the non-blocking socket.
414  * It's non blocking because it was set that way for recvmsg.
415  */
416 int
417 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
418 {
419     fd_set *sfds = (fd_set *) 0;
420     while (sendmsg(socket, msg_p, flags) == -1) {
421         int err;
422         if (rx_stats_active)
423             rx_atomic_inc(&rx_stats.sendSelects);
424
425         if (!sfds) {
426             if (!(sfds = IOMGR_AllocFDSet())) {
427                 (osi_Msg "rx failed to alloc fd_set: ");
428                 perror("rx_sendmsg");
429                 return -1;
430             }
431             FD_SET(socket, sfds);
432         }
433 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
434         while((rxi_HandleSocketError(socket)) > 0)
435           ;
436 #endif
437 #ifdef AFS_NT40_ENV
438         if (WSAGetLastError())
439 #elif defined(AFS_LINUX22_ENV)
440         /* linux unfortunately returns ECONNREFUSED if the target port
441          * is no longer in use */
442         /* and EAGAIN if a UDP checksum is incorrect */
443         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
444             && errno != EAGAIN)
445 #else
446         if (errno != EWOULDBLOCK && errno != ENOBUFS)
447 #endif
448         {
449             (osi_Msg "rx failed to send packet: ");
450             perror("rx_sendmsg");
451 #ifndef AFS_NT40_ENV
452             if (errno > 0)
453               return -errno;
454 #else
455             if (WSAGetLastError() > 0)
456               return -WSAGetLastError();
457 #endif
458             return -1;
459         }
460         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
461             if (err >= 0 || errno != EINTR)
462                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
463             FD_ZERO(sfds);
464             FD_SET(socket, sfds);
465         }
466     }
467     if (sfds)
468         IOMGR_FreeFDSet(sfds);
469     return 0;
470 }