aa1c5bbc3c72a7906fce6f8c0a6f4cca4284f781
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25
26 #ifdef HAVE_SYS_FILE_H
27 # include <sys/file.h>
28 #endif
29
30 #include <lwp.h>
31
32 #include "rx.h"
33 #include "rx_atomic.h"
34 #include "rx_globals.h"
35 #include "rx_stats.h"
36
37 #define MAXTHREADNAMELENGTH 64
38
39 int debugSelectFailure;         /* # of times select failed */
40
41 /*
42  * Sleep on the unique wait channel provided.
43  */
44 void
45 rxi_Sleep(void *addr)
46 {
47     LWP_WaitProcess(addr);
48 }
49
50 /*
51  * Wakeup any threads on the channel provided.
52  * They may be woken up spuriously, and must check any conditions.
53  */
54 void
55 rxi_Wakeup(void *addr)
56 {
57     LWP_NoYieldSignal(addr);
58 }
59
60 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
61 static void* rx_ListenerProc(void *dummy);
62
63 /*
64  * Delay the current thread the specified number of seconds.
65  */
66 void
67 rxi_Delay(int sec)
68 {
69     IOMGR_Sleep(sec);
70 }
71
72 static int quitListening = 0;
73
74 /* This routine will kill the listener thread, if it exists. */
75 void
76 rxi_StopListener(void)
77 {
78     quitListening = 1;
79     rxi_ReScheduleEvents();
80 }
81
82 /* This routine will get called by the event package whenever a new,
83    earlier than others, event is posted.  If the Listener process
84    is blocked in selects, this will unblock it.  It also can be called
85    to force a new trip through the rxi_Listener select loop when the set
86    of file descriptors it should be listening to changes... */
87 void
88 rxi_ReScheduleEvents(void)
89 {
90     if (rx_listenerPid)
91         IOMGR_Cancel(rx_listenerPid);
92 }
93
94 void
95 rxi_InitializeThreadSupport(void)
96 {
97     PROCESS junk;
98
99     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
100     IOMGR_Initialize();
101     FD_ZERO(&rx_selectMask);
102 }
103
104 void
105 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
106 {
107     PROCESS scratchPid;
108     static int number = 0;
109     char name[32];
110
111     sprintf(name, "srv_%d", ++number);
112     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
113                       "rx_ServerProc", &scratchPid);
114     if (registerProgram)
115         (*registerProgram) (scratchPid, name);
116 }
117
118 void
119 rxi_StartListener(void)
120 {
121     /* Priority of listener should be high, so it can keep conns alive */
122 #define RX_LIST_STACK   24000
123     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
124                       NULL, "rx_Listener", &rx_listenerPid);
125     if (registerProgram)
126         (*registerProgram) (rx_listenerPid, "listener");
127 }
128
129 /* The main loop which listens to the net for datagrams, and handles timeouts
130    and retransmissions, etc.  It also is responsible for scheduling the
131    execution of pending events (in conjunction with event.c).
132
133    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
134    keeping up with the incoming stream of packets (because there are threads that
135    are interfering with its running sufficiently often), rx does a polling select
136    before doing a real IOMGR_Select system call.  Doing a real select means that
137    we don't have to let other processes run before processing more packets.
138
139    So, our algorithm is that if the last poll on the file descriptor found useful data, or
140    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
141    then we try the polling select before the IOMGR_Select.  If we eventually catch up
142    (which we can tell by the polling select returning no input packets ready), then we
143    don't do a polling select again until several seconds later (via nextPollTime mechanism).
144    */
145
146 static void
147 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
148 {
149     afs_uint32 host;
150     u_short port;
151     struct rx_packet *p = (struct rx_packet *)0;
152     osi_socket socket;
153     struct clock cv;
154     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
155     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
156     struct timeval tv, *tvp;
157     int code;
158 #ifdef AFS_NT40_ENV
159     int i;
160 #endif
161     PROCESS pid;
162     char name[MAXTHREADNAMELENGTH] = "srv_0";
163
164     clock_NewTime();
165     lastPollWorked = 0;
166     nextPollTime = 0;
167     code = LWP_CurrentProcess(&pid);
168     if (code) {
169         osi_Panic("rxi_Listener: Can't get my pid.\n");
170     }
171     rx_listenerPid = pid;
172     if (swapNameProgram)
173         (*swapNameProgram) (pid, "listener", &name[0]);
174
175     for (;;) {
176         /* See if a check for additional packets was issued */
177         rx_CheckPackets();
178
179         /* Grab a new packet only if necessary (otherwise re-use the old one) */
180         if (p) {
181             rxi_RestoreDataBufs(p);
182         } else {
183             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
184                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
185         }
186         /* Wait for the next event time or a packet to arrive. */
187         /* event_RaiseEvents schedules any events whose time has come and
188          * then atomically computes the time to the next event, guaranteeing
189          * that this is positive.  If there is no next event, it returns 0 */
190         clock_NewTime();
191         if (!rxevent_RaiseEvents(&cv))
192             tvp = NULL;
193         else {
194             /* It's important to copy cv to tv, because the 4.3 documentation
195              * for select threatens that *tv may be updated after a select, in
196              * future editions of the system, to indicate how much of the time
197              * period has elapsed.  So we shouldn't rely on tv not being altered. */
198             tv.tv_sec = cv.sec; /* Time to next event */
199             tv.tv_usec = cv.usec;
200             tvp = &tv;
201         }
202         if (rx_stats_active)
203             rx_atomic_inc(&rx_stats.selects);
204
205         *rfds = rx_selectMask;
206
207         if (lastPollWorked || nextPollTime < clock_Sec()) {
208             /* we're catching up, or haven't tried to for a few seconds */
209             doingPoll = 1;
210             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
211             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
212             tvp = &tv;
213             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
214         } else {
215             doingPoll = 0;
216             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
217         }
218         lastPollWorked = 0;     /* default is that it didn't find anything */
219
220         if (quitListening) {
221             quitListening = 0;
222             LWP_DestroyProcess(pid);
223         }
224
225         switch (code) {
226         case 0:
227             /* Timer interrupt:
228              * If it was a timer interrupt then we can assume that
229              * the time has advanced by roughly the value of the
230              * previous timeout, and that there is now at least
231              * one pending event.
232              */
233             clock_NewTime();
234             break;
235         case -1:
236             /* select or IOMGR_Select returned failure */
237             debugSelectFailure++;       /* update debugging counter */
238             clock_NewTime();
239             break;
240         case -2:
241             /* IOMGR_Cancel:
242              * IOMGR_Cancel is invoked whenever a new event is
243              * posted that is earlier than any existing events.
244              * So we re-evaluate the time, and then go back to
245              * reschedule events
246              */
247             clock_NewTime();
248             break;
249
250         default:
251             /* Packets have arrived, presumably:
252              * If it wasn't a timer interrupt, then no event should have
253              * timed out yet (well some event may have, but only just...), so
254              * we don't bother looking to see if any have timed out, but just
255              * go directly to reading the data packets
256              */
257             clock_NewTime();
258             if (doingPoll)
259                 lastPollWorked = 1;
260 #ifdef AFS_NT40_ENV
261             for (i = 0; p && i < rfds->fd_count; i++) {
262                 socket = rfds->fd_array[i];
263                 if (rxi_ReadPacket(socket, p, &host, &port)) {
264                     *newcallp = NULL;
265                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
266                                           newcallp);
267                     if (newcallp && *newcallp) {
268                         if (p) {
269                             rxi_FreePacket(p);
270                         }
271                         if (swapNameProgram) {
272                             (*swapNameProgram) (rx_listenerPid, name, 0);
273                             rx_listenerPid = 0;
274                         }
275                         return;
276                     }
277                 }
278             }
279 #else
280             for (socket = rx_minSocketNumber;
281                  p && socket <= rx_maxSocketNumber; socket++) {
282                 if (!FD_ISSET(socket, rfds))
283                     continue;
284                 if (rxi_ReadPacket(socket, p, &host, &port)) {
285                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
286                                           newcallp);
287                     if (newcallp && *newcallp) {
288                         if (p) {
289                             rxi_FreePacket(p);
290                         }
291                         if (swapNameProgram) {
292                             (*swapNameProgram) (rx_listenerPid, name, 0);
293                             rx_listenerPid = 0;
294                         }
295                         return;
296                     }
297                 }
298             }
299 #endif
300             break;
301         }
302     }
303     /* NOTREACHED */
304 }
305
306 /* This is the listener process request loop. The listener process loop
307  * becomes a server thread when rxi_ListenerProc returns, and stays
308  * server thread until rxi_ServerProc returns. */
309 static void *
310 rx_ListenerProc(void *dummy)
311 {
312     int threadID;
313     osi_socket sock;
314     struct rx_call *newcall;
315     fd_set *rfds;
316
317     if (!(rfds = IOMGR_AllocFDSet())) {
318         osi_Panic("rx_ListenerProc: no fd_sets!\n");
319     }
320
321     while (1) {
322         newcall = NULL;
323         threadID = -1;
324         rxi_ListenerProc(rfds, &threadID, &newcall);
325         /* osi_Assert(threadID != -1); */
326         /* osi_Assert(newcall != NULL); */
327         sock = OSI_NULLSOCKET;
328         rxi_ServerProc(threadID, newcall, &sock);
329         /* osi_Assert(sock != OSI_NULLSOCKET); */
330     }
331     /* not reached */
332     return NULL;
333 }
334
335 /* This is the server process request loop. The server process loop
336  * becomes a listener thread when rxi_ServerProc returns, and stays
337  * listener thread until rxi_ListenerProc returns. */
338 void *
339 rx_ServerProc(void * unused)
340 {
341     osi_socket sock;
342     int threadID;
343     struct rx_call *newcall = NULL;
344     fd_set *rfds;
345
346     if (!(rfds = IOMGR_AllocFDSet())) {
347         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
348     }
349
350     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
351     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
352     /* threadID is used for making decisions in GetCall.  Get it by bumping
353      * number of threads handling incoming calls */
354     threadID = rxi_availProcs++;
355
356     while (1) {
357         sock = OSI_NULLSOCKET;
358         rxi_ServerProc(threadID, newcall, &sock);
359         /* osi_Assert(sock != OSI_NULLSOCKET); */
360         newcall = NULL;
361         rxi_ListenerProc(rfds, &threadID, &newcall);
362         /* osi_Assert(threadID != -1); */
363         /* osi_Assert(newcall != NULL); */
364     }
365     /* not reached */
366     return NULL;
367 }
368
369 /*
370  * Called from GetUDPSocket.
371  * Called from a single thread at startup.
372  * Returns 0 on success; -1 on failure.
373  */
374 int
375 rxi_Listen(osi_socket sock)
376 {
377 #ifndef AFS_NT40_ENV
378     /*
379      * Put the socket into non-blocking mode so that rx_Listener
380      * can do a polling read before entering select
381      */
382     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
383         perror("fcntl");
384         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
385         return -1;
386     }
387
388     if (sock > FD_SETSIZE - 1) {
389         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
390          FD_SETSIZE - 1);
391         return -1;
392     }
393 #endif
394
395     FD_SET(sock, &rx_selectMask);
396     if (sock > rx_maxSocketNumber)
397         rx_maxSocketNumber = sock;
398     if (sock < rx_minSocketNumber)
399         rx_minSocketNumber = sock;
400     return 0;
401 }
402
403 /*
404  * Recvmsg
405  */
406 int
407 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
408 {
409 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
410     while((rxi_HandleSocketError(socket)) > 0)
411         ;
412 #endif
413     return recvmsg(socket, msg_p, flags);
414 }
415
416 /*
417  * Simulate a blocking sendmsg on the non-blocking socket.
418  * It's non blocking because it was set that way for recvmsg.
419  */
420 int
421 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
422 {
423     fd_set *sfds = (fd_set *) 0;
424     while (sendmsg(socket, msg_p, flags) == -1) {
425         int err;
426         if (rx_stats_active)
427             rx_atomic_inc(&rx_stats.sendSelects);
428
429         if (!sfds) {
430             if (!(sfds = IOMGR_AllocFDSet())) {
431                 (osi_Msg "rx failed to alloc fd_set: ");
432                 perror("rx_sendmsg");
433                 return -1;
434             }
435             FD_SET(socket, sfds);
436         }
437 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
438         while((rxi_HandleSocketError(socket)) > 0)
439           ;
440 #endif
441 #ifdef AFS_NT40_ENV
442         if (WSAGetLastError())
443 #elif defined(AFS_LINUX22_ENV)
444         /* linux unfortunately returns ECONNREFUSED if the target port
445          * is no longer in use */
446         /* and EAGAIN if a UDP checksum is incorrect */
447         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
448             && errno != EAGAIN)
449 #else
450         if (errno != EWOULDBLOCK && errno != ENOBUFS)
451 #endif
452         {
453             (osi_Msg "rx failed to send packet: ");
454             perror("rx_sendmsg");
455 #ifndef AFS_NT40_ENV
456             if (errno > 0)
457               return -errno;
458 #else
459             if (WSAGetLastError() > 0)
460               return -WSAGetLastError();
461 #endif
462             return -1;
463         }
464         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
465             if (err >= 0 || errno != EINTR)
466                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
467             FD_ZERO(sfds);
468             FD_SET(socket, sfds);
469         }
470     }
471     if (sfds)
472         IOMGR_FreeFDSet(sfds);
473     return 0;
474 }