rx: Tidy up function scope in rx.c
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25
26 #ifdef HAVE_SYS_FILE_H
27 # include <sys/file.h>
28 #endif
29
30 #include <lwp.h>
31
32 #include "rx.h"
33 #include "rx_atomic.h"
34 #include "rx_globals.h"
35 #include "rx_internal.h"
36 #include "rx_stats.h"
37 #ifdef AFS_NT40_ENV
38 #include "rx_xmit_nt.h"
39 #endif
40
41 #define MAXTHREADNAMELENGTH 64
42
43 int debugSelectFailure;         /* # of times select failed */
44
45 /*
46  * Sleep on the unique wait channel provided.
47  */
48 void
49 rxi_Sleep(void *addr)
50 {
51     LWP_WaitProcess(addr);
52 }
53
54 /*
55  * Wakeup any threads on the channel provided.
56  * They may be woken up spuriously, and must check any conditions.
57  */
58 void
59 rxi_Wakeup(void *addr)
60 {
61     LWP_NoYieldSignal(addr);
62 }
63
64 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
65 static void* rx_ListenerProc(void *dummy);
66
67 /*
68  * Delay the current thread the specified number of seconds.
69  */
70 void
71 rxi_Delay(int sec)
72 {
73     IOMGR_Sleep(sec);
74 }
75
76 static int quitListening = 0;
77
78 /* This routine will kill the listener thread, if it exists. */
79 void
80 rxi_StopListener(void)
81 {
82     quitListening = 1;
83     rxi_ReScheduleEvents();
84 }
85
86 /* This routine will get called by the event package whenever a new,
87    earlier than others, event is posted.  If the Listener process
88    is blocked in selects, this will unblock it.  It also can be called
89    to force a new trip through the rxi_Listener select loop when the set
90    of file descriptors it should be listening to changes... */
91 void
92 rxi_ReScheduleEvents(void)
93 {
94     if (rx_listenerPid)
95         IOMGR_Cancel(rx_listenerPid);
96 }
97
98 void
99 rxi_InitializeThreadSupport(void)
100 {
101     PROCESS junk;
102
103     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
104     IOMGR_Initialize();
105     FD_ZERO(&rx_selectMask);
106 }
107
108 void
109 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
110 {
111     PROCESS scratchPid;
112     static int number = 0;
113     char name[32];
114
115     sprintf(name, "srv_%d", ++number);
116     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
117                       "rx_ServerProc", &scratchPid);
118     if (registerProgram)
119         (*registerProgram) (scratchPid, name);
120 }
121
122 void
123 rxi_StartListener(void)
124 {
125     /* Priority of listener should be high, so it can keep conns alive */
126 #define RX_LIST_STACK   24000
127     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
128                       NULL, "rx_Listener", &rx_listenerPid);
129     if (registerProgram)
130         (*registerProgram) (rx_listenerPid, "listener");
131 }
132
133 /* The main loop which listens to the net for datagrams, and handles timeouts
134    and retransmissions, etc.  It also is responsible for scheduling the
135    execution of pending events (in conjunction with event.c).
136
137    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
138    keeping up with the incoming stream of packets (because there are threads that
139    are interfering with its running sufficiently often), rx does a polling select
140    before doing a real IOMGR_Select system call.  Doing a real select means that
141    we don't have to let other processes run before processing more packets.
142
143    So, our algorithm is that if the last poll on the file descriptor found useful data, or
144    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
145    then we try the polling select before the IOMGR_Select.  If we eventually catch up
146    (which we can tell by the polling select returning no input packets ready), then we
147    don't do a polling select again until several seconds later (via nextPollTime mechanism).
148    */
149
150 static void
151 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
152 {
153     afs_uint32 host;
154     u_short port;
155     struct rx_packet *p = (struct rx_packet *)0;
156     osi_socket socket;
157     struct clock cv;
158     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
159     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
160     struct timeval tv, *tvp;
161     int code;
162 #ifdef AFS_NT40_ENV
163     int i;
164 #endif
165     PROCESS pid;
166     char name[MAXTHREADNAMELENGTH] = "srv_0";
167
168     clock_NewTime();
169     lastPollWorked = 0;
170     nextPollTime = 0;
171     code = LWP_CurrentProcess(&pid);
172     if (code) {
173         osi_Panic("rxi_Listener: Can't get my pid.\n");
174     }
175     rx_listenerPid = pid;
176     if (swapNameProgram)
177         (*swapNameProgram) (pid, "listener", &name[0]);
178
179     for (;;) {
180         /* See if a check for additional packets was issued */
181         rx_CheckPackets();
182
183         /* Grab a new packet only if necessary (otherwise re-use the old one) */
184         if (p) {
185             rxi_RestoreDataBufs(p);
186         } else {
187             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
188                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
189         }
190         /* Wait for the next event time or a packet to arrive. */
191         /* event_RaiseEvents schedules any events whose time has come and
192          * then atomically computes the time to the next event, guaranteeing
193          * that this is positive.  If there is no next event, it returns 0 */
194         clock_NewTime();
195         if (!rxevent_RaiseEvents(&cv))
196             tvp = NULL;
197         else {
198             /* It's important to copy cv to tv, because the 4.3 documentation
199              * for select threatens that *tv may be updated after a select, in
200              * future editions of the system, to indicate how much of the time
201              * period has elapsed.  So we shouldn't rely on tv not being altered. */
202             tv.tv_sec = cv.sec; /* Time to next event */
203             tv.tv_usec = cv.usec;
204             tvp = &tv;
205         }
206         if (rx_stats_active)
207             rx_atomic_inc(&rx_stats.selects);
208
209         *rfds = rx_selectMask;
210
211         if (lastPollWorked || nextPollTime < clock_Sec()) {
212             /* we're catching up, or haven't tried to for a few seconds */
213             doingPoll = 1;
214             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
215             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
216             tvp = &tv;
217             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
218         } else {
219             doingPoll = 0;
220             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
221         }
222         lastPollWorked = 0;     /* default is that it didn't find anything */
223
224         if (quitListening) {
225             quitListening = 0;
226             LWP_DestroyProcess(pid);
227         }
228
229         switch (code) {
230         case 0:
231             /* Timer interrupt:
232              * If it was a timer interrupt then we can assume that
233              * the time has advanced by roughly the value of the
234              * previous timeout, and that there is now at least
235              * one pending event.
236              */
237             clock_NewTime();
238             break;
239         case -1:
240             /* select or IOMGR_Select returned failure */
241             debugSelectFailure++;       /* update debugging counter */
242             clock_NewTime();
243             break;
244         case -2:
245             /* IOMGR_Cancel:
246              * IOMGR_Cancel is invoked whenever a new event is
247              * posted that is earlier than any existing events.
248              * So we re-evaluate the time, and then go back to
249              * reschedule events
250              */
251             clock_NewTime();
252             break;
253
254         default:
255             /* Packets have arrived, presumably:
256              * If it wasn't a timer interrupt, then no event should have
257              * timed out yet (well some event may have, but only just...), so
258              * we don't bother looking to see if any have timed out, but just
259              * go directly to reading the data packets
260              */
261             clock_NewTime();
262             if (doingPoll)
263                 lastPollWorked = 1;
264 #ifdef AFS_NT40_ENV
265             for (i = 0; p && i < rfds->fd_count; i++) {
266                 socket = rfds->fd_array[i];
267                 if (rxi_ReadPacket(socket, p, &host, &port)) {
268                     *newcallp = NULL;
269                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
270                                           newcallp);
271                     if (newcallp && *newcallp) {
272                         if (p) {
273                             rxi_FreePacket(p);
274                         }
275                         if (swapNameProgram) {
276                             (*swapNameProgram) (rx_listenerPid, name, 0);
277                             rx_listenerPid = 0;
278                         }
279                         return;
280                     }
281                 }
282             }
283 #else
284             for (socket = rx_minSocketNumber;
285                  p && socket <= rx_maxSocketNumber; socket++) {
286                 if (!FD_ISSET(socket, rfds))
287                     continue;
288                 if (rxi_ReadPacket(socket, p, &host, &port)) {
289                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
290                                           newcallp);
291                     if (newcallp && *newcallp) {
292                         if (p) {
293                             rxi_FreePacket(p);
294                         }
295                         if (swapNameProgram) {
296                             (*swapNameProgram) (rx_listenerPid, name, 0);
297                             rx_listenerPid = 0;
298                         }
299                         return;
300                     }
301                 }
302             }
303 #endif
304             break;
305         }
306     }
307     /* NOTREACHED */
308 }
309
310 /* This is the listener process request loop. The listener process loop
311  * becomes a server thread when rxi_ListenerProc returns, and stays
312  * server thread until rxi_ServerProc returns. */
313 static void *
314 rx_ListenerProc(void *dummy)
315 {
316     int threadID;
317     osi_socket sock;
318     struct rx_call *newcall;
319     fd_set *rfds;
320
321     if (!(rfds = IOMGR_AllocFDSet())) {
322         osi_Panic("rx_ListenerProc: no fd_sets!\n");
323     }
324
325     while (1) {
326         newcall = NULL;
327         threadID = -1;
328         rxi_ListenerProc(rfds, &threadID, &newcall);
329         /* osi_Assert(threadID != -1); */
330         /* osi_Assert(newcall != NULL); */
331         sock = OSI_NULLSOCKET;
332         rxi_ServerProc(threadID, newcall, &sock);
333         /* osi_Assert(sock != OSI_NULLSOCKET); */
334     }
335     /* not reached */
336     return NULL;
337 }
338
339 /* This is the server process request loop. The server process loop
340  * becomes a listener thread when rxi_ServerProc returns, and stays
341  * listener thread until rxi_ListenerProc returns. */
342 void *
343 rx_ServerProc(void * unused)
344 {
345     osi_socket sock;
346     int threadID;
347     struct rx_call *newcall = NULL;
348     fd_set *rfds;
349
350     if (!(rfds = IOMGR_AllocFDSet())) {
351         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
352     }
353
354     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
355     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
356     /* threadID is used for making decisions in GetCall.  Get it by bumping
357      * number of threads handling incoming calls */
358     threadID = rxi_availProcs++;
359
360     while (1) {
361         sock = OSI_NULLSOCKET;
362         rxi_ServerProc(threadID, newcall, &sock);
363         /* osi_Assert(sock != OSI_NULLSOCKET); */
364         newcall = NULL;
365         rxi_ListenerProc(rfds, &threadID, &newcall);
366         /* osi_Assert(threadID != -1); */
367         /* osi_Assert(newcall != NULL); */
368     }
369     /* not reached */
370     return NULL;
371 }
372
373 /*
374  * Called from GetUDPSocket.
375  * Called from a single thread at startup.
376  * Returns 0 on success; -1 on failure.
377  */
378 int
379 rxi_Listen(osi_socket sock)
380 {
381 #ifndef AFS_NT40_ENV
382     /*
383      * Put the socket into non-blocking mode so that rx_Listener
384      * can do a polling read before entering select
385      */
386     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
387         perror("fcntl");
388         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
389         return -1;
390     }
391
392     if (sock > FD_SETSIZE - 1) {
393         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
394          FD_SETSIZE - 1);
395         return -1;
396     }
397 #endif
398
399     FD_SET(sock, &rx_selectMask);
400     if (sock > rx_maxSocketNumber)
401         rx_maxSocketNumber = sock;
402     if (sock < rx_minSocketNumber)
403         rx_minSocketNumber = sock;
404     return 0;
405 }
406
407 /*
408  * Recvmsg
409  */
410 int
411 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
412 {
413 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
414     while((rxi_HandleSocketError(socket)) > 0)
415         ;
416 #endif
417     return recvmsg(socket, msg_p, flags);
418 }
419
420 /*
421  * Simulate a blocking sendmsg on the non-blocking socket.
422  * It's non blocking because it was set that way for recvmsg.
423  */
424 int
425 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
426 {
427     fd_set *sfds = (fd_set *) 0;
428     while (sendmsg(socket, msg_p, flags) == -1) {
429         int err;
430         if (rx_stats_active)
431             rx_atomic_inc(&rx_stats.sendSelects);
432
433         if (!sfds) {
434             if (!(sfds = IOMGR_AllocFDSet())) {
435                 (osi_Msg "rx failed to alloc fd_set: ");
436                 perror("rx_sendmsg");
437                 return -1;
438             }
439             FD_SET(socket, sfds);
440         }
441 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
442         while((rxi_HandleSocketError(socket)) > 0)
443           ;
444 #endif
445 #ifdef AFS_NT40_ENV
446         if (WSAGetLastError())
447 #elif defined(AFS_LINUX22_ENV)
448         /* linux unfortunately returns ECONNREFUSED if the target port
449          * is no longer in use */
450         /* and EAGAIN if a UDP checksum is incorrect */
451         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
452             && errno != EAGAIN)
453 #else
454         if (errno != EWOULDBLOCK && errno != ENOBUFS)
455 #endif
456         {
457             (osi_Msg "rx failed to send packet: ");
458             perror("rx_sendmsg");
459 #ifndef AFS_NT40_ENV
460             if (errno > 0)
461               return -errno;
462 #else
463             if (WSAGetLastError() > 0)
464               return -WSAGetLastError();
465 #endif
466             return -1;
467         }
468         while ((err = select(
469 #ifdef AFS_NT40_ENV
470                              0,
471 #else
472                              socket + 1,
473 #endif
474                              0, sfds, 0, 0)) != 1) {
475             if (err >= 0 || errno != EINTR)
476                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
477             FD_ZERO(sfds);
478             FD_SET(socket, sfds);
479         }
480     }
481     if (sfds)
482         IOMGR_FreeFDSet(sfds);
483     return 0;
484 }