rx: Hide the rx_packet.h
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25
26 #ifdef HAVE_SYS_FILE_H
27 # include <sys/file.h>
28 #endif
29
30 #include <lwp.h>
31
32 #include "rx.h"
33 #include "rx_atomic.h"
34 #include "rx_globals.h"
35 #include "rx_stats.h"
36 #ifdef AFS_NT40_ENV
37 #include "rx_xmit_nt.h"
38 #endif
39
40 #define MAXTHREADNAMELENGTH 64
41
42 int debugSelectFailure;         /* # of times select failed */
43
44 /*
45  * Sleep on the unique wait channel provided.
46  */
47 void
48 rxi_Sleep(void *addr)
49 {
50     LWP_WaitProcess(addr);
51 }
52
53 /*
54  * Wakeup any threads on the channel provided.
55  * They may be woken up spuriously, and must check any conditions.
56  */
57 void
58 rxi_Wakeup(void *addr)
59 {
60     LWP_NoYieldSignal(addr);
61 }
62
63 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
64 static void* rx_ListenerProc(void *dummy);
65
66 /*
67  * Delay the current thread the specified number of seconds.
68  */
69 void
70 rxi_Delay(int sec)
71 {
72     IOMGR_Sleep(sec);
73 }
74
75 static int quitListening = 0;
76
77 /* This routine will kill the listener thread, if it exists. */
78 void
79 rxi_StopListener(void)
80 {
81     quitListening = 1;
82     rxi_ReScheduleEvents();
83 }
84
85 /* This routine will get called by the event package whenever a new,
86    earlier than others, event is posted.  If the Listener process
87    is blocked in selects, this will unblock it.  It also can be called
88    to force a new trip through the rxi_Listener select loop when the set
89    of file descriptors it should be listening to changes... */
90 void
91 rxi_ReScheduleEvents(void)
92 {
93     if (rx_listenerPid)
94         IOMGR_Cancel(rx_listenerPid);
95 }
96
97 void
98 rxi_InitializeThreadSupport(void)
99 {
100     PROCESS junk;
101
102     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
103     IOMGR_Initialize();
104     FD_ZERO(&rx_selectMask);
105 }
106
107 void
108 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
109 {
110     PROCESS scratchPid;
111     static int number = 0;
112     char name[32];
113
114     sprintf(name, "srv_%d", ++number);
115     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
116                       "rx_ServerProc", &scratchPid);
117     if (registerProgram)
118         (*registerProgram) (scratchPid, name);
119 }
120
121 void
122 rxi_StartListener(void)
123 {
124     /* Priority of listener should be high, so it can keep conns alive */
125 #define RX_LIST_STACK   24000
126     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
127                       NULL, "rx_Listener", &rx_listenerPid);
128     if (registerProgram)
129         (*registerProgram) (rx_listenerPid, "listener");
130 }
131
132 /* The main loop which listens to the net for datagrams, and handles timeouts
133    and retransmissions, etc.  It also is responsible for scheduling the
134    execution of pending events (in conjunction with event.c).
135
136    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
137    keeping up with the incoming stream of packets (because there are threads that
138    are interfering with its running sufficiently often), rx does a polling select
139    before doing a real IOMGR_Select system call.  Doing a real select means that
140    we don't have to let other processes run before processing more packets.
141
142    So, our algorithm is that if the last poll on the file descriptor found useful data, or
143    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
144    then we try the polling select before the IOMGR_Select.  If we eventually catch up
145    (which we can tell by the polling select returning no input packets ready), then we
146    don't do a polling select again until several seconds later (via nextPollTime mechanism).
147    */
148
149 static void
150 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
151 {
152     afs_uint32 host;
153     u_short port;
154     struct rx_packet *p = (struct rx_packet *)0;
155     osi_socket socket;
156     struct clock cv;
157     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
158     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
159     struct timeval tv, *tvp;
160     int code;
161 #ifdef AFS_NT40_ENV
162     int i;
163 #endif
164     PROCESS pid;
165     char name[MAXTHREADNAMELENGTH] = "srv_0";
166
167     clock_NewTime();
168     lastPollWorked = 0;
169     nextPollTime = 0;
170     code = LWP_CurrentProcess(&pid);
171     if (code) {
172         osi_Panic("rxi_Listener: Can't get my pid.\n");
173     }
174     rx_listenerPid = pid;
175     if (swapNameProgram)
176         (*swapNameProgram) (pid, "listener", &name[0]);
177
178     for (;;) {
179         /* See if a check for additional packets was issued */
180         rx_CheckPackets();
181
182         /* Grab a new packet only if necessary (otherwise re-use the old one) */
183         if (p) {
184             rxi_RestoreDataBufs(p);
185         } else {
186             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
187                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
188         }
189         /* Wait for the next event time or a packet to arrive. */
190         /* event_RaiseEvents schedules any events whose time has come and
191          * then atomically computes the time to the next event, guaranteeing
192          * that this is positive.  If there is no next event, it returns 0 */
193         clock_NewTime();
194         if (!rxevent_RaiseEvents(&cv))
195             tvp = NULL;
196         else {
197             /* It's important to copy cv to tv, because the 4.3 documentation
198              * for select threatens that *tv may be updated after a select, in
199              * future editions of the system, to indicate how much of the time
200              * period has elapsed.  So we shouldn't rely on tv not being altered. */
201             tv.tv_sec = cv.sec; /* Time to next event */
202             tv.tv_usec = cv.usec;
203             tvp = &tv;
204         }
205         if (rx_stats_active)
206             rx_atomic_inc(&rx_stats.selects);
207
208         *rfds = rx_selectMask;
209
210         if (lastPollWorked || nextPollTime < clock_Sec()) {
211             /* we're catching up, or haven't tried to for a few seconds */
212             doingPoll = 1;
213             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
214             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
215             tvp = &tv;
216             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
217         } else {
218             doingPoll = 0;
219             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
220         }
221         lastPollWorked = 0;     /* default is that it didn't find anything */
222
223         if (quitListening) {
224             quitListening = 0;
225             LWP_DestroyProcess(pid);
226         }
227
228         switch (code) {
229         case 0:
230             /* Timer interrupt:
231              * If it was a timer interrupt then we can assume that
232              * the time has advanced by roughly the value of the
233              * previous timeout, and that there is now at least
234              * one pending event.
235              */
236             clock_NewTime();
237             break;
238         case -1:
239             /* select or IOMGR_Select returned failure */
240             debugSelectFailure++;       /* update debugging counter */
241             clock_NewTime();
242             break;
243         case -2:
244             /* IOMGR_Cancel:
245              * IOMGR_Cancel is invoked whenever a new event is
246              * posted that is earlier than any existing events.
247              * So we re-evaluate the time, and then go back to
248              * reschedule events
249              */
250             clock_NewTime();
251             break;
252
253         default:
254             /* Packets have arrived, presumably:
255              * If it wasn't a timer interrupt, then no event should have
256              * timed out yet (well some event may have, but only just...), so
257              * we don't bother looking to see if any have timed out, but just
258              * go directly to reading the data packets
259              */
260             clock_NewTime();
261             if (doingPoll)
262                 lastPollWorked = 1;
263 #ifdef AFS_NT40_ENV
264             for (i = 0; p && i < rfds->fd_count; i++) {
265                 socket = rfds->fd_array[i];
266                 if (rxi_ReadPacket(socket, p, &host, &port)) {
267                     *newcallp = NULL;
268                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
269                                           newcallp);
270                     if (newcallp && *newcallp) {
271                         if (p) {
272                             rxi_FreePacket(p);
273                         }
274                         if (swapNameProgram) {
275                             (*swapNameProgram) (rx_listenerPid, name, 0);
276                             rx_listenerPid = 0;
277                         }
278                         return;
279                     }
280                 }
281             }
282 #else
283             for (socket = rx_minSocketNumber;
284                  p && socket <= rx_maxSocketNumber; socket++) {
285                 if (!FD_ISSET(socket, rfds))
286                     continue;
287                 if (rxi_ReadPacket(socket, p, &host, &port)) {
288                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
289                                           newcallp);
290                     if (newcallp && *newcallp) {
291                         if (p) {
292                             rxi_FreePacket(p);
293                         }
294                         if (swapNameProgram) {
295                             (*swapNameProgram) (rx_listenerPid, name, 0);
296                             rx_listenerPid = 0;
297                         }
298                         return;
299                     }
300                 }
301             }
302 #endif
303             break;
304         }
305     }
306     /* NOTREACHED */
307 }
308
309 /* This is the listener process request loop. The listener process loop
310  * becomes a server thread when rxi_ListenerProc returns, and stays
311  * server thread until rxi_ServerProc returns. */
312 static void *
313 rx_ListenerProc(void *dummy)
314 {
315     int threadID;
316     osi_socket sock;
317     struct rx_call *newcall;
318     fd_set *rfds;
319
320     if (!(rfds = IOMGR_AllocFDSet())) {
321         osi_Panic("rx_ListenerProc: no fd_sets!\n");
322     }
323
324     while (1) {
325         newcall = NULL;
326         threadID = -1;
327         rxi_ListenerProc(rfds, &threadID, &newcall);
328         /* osi_Assert(threadID != -1); */
329         /* osi_Assert(newcall != NULL); */
330         sock = OSI_NULLSOCKET;
331         rxi_ServerProc(threadID, newcall, &sock);
332         /* osi_Assert(sock != OSI_NULLSOCKET); */
333     }
334     /* not reached */
335     return NULL;
336 }
337
338 /* This is the server process request loop. The server process loop
339  * becomes a listener thread when rxi_ServerProc returns, and stays
340  * listener thread until rxi_ListenerProc returns. */
341 void *
342 rx_ServerProc(void * unused)
343 {
344     osi_socket sock;
345     int threadID;
346     struct rx_call *newcall = NULL;
347     fd_set *rfds;
348
349     if (!(rfds = IOMGR_AllocFDSet())) {
350         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
351     }
352
353     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
354     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
355     /* threadID is used for making decisions in GetCall.  Get it by bumping
356      * number of threads handling incoming calls */
357     threadID = rxi_availProcs++;
358
359     while (1) {
360         sock = OSI_NULLSOCKET;
361         rxi_ServerProc(threadID, newcall, &sock);
362         /* osi_Assert(sock != OSI_NULLSOCKET); */
363         newcall = NULL;
364         rxi_ListenerProc(rfds, &threadID, &newcall);
365         /* osi_Assert(threadID != -1); */
366         /* osi_Assert(newcall != NULL); */
367     }
368     /* not reached */
369     return NULL;
370 }
371
372 /*
373  * Called from GetUDPSocket.
374  * Called from a single thread at startup.
375  * Returns 0 on success; -1 on failure.
376  */
377 int
378 rxi_Listen(osi_socket sock)
379 {
380 #ifndef AFS_NT40_ENV
381     /*
382      * Put the socket into non-blocking mode so that rx_Listener
383      * can do a polling read before entering select
384      */
385     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
386         perror("fcntl");
387         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
388         return -1;
389     }
390
391     if (sock > FD_SETSIZE - 1) {
392         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
393          FD_SETSIZE - 1);
394         return -1;
395     }
396 #endif
397
398     FD_SET(sock, &rx_selectMask);
399     if (sock > rx_maxSocketNumber)
400         rx_maxSocketNumber = sock;
401     if (sock < rx_minSocketNumber)
402         rx_minSocketNumber = sock;
403     return 0;
404 }
405
406 /*
407  * Recvmsg
408  */
409 int
410 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
411 {
412 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
413     while((rxi_HandleSocketError(socket)) > 0)
414         ;
415 #endif
416     return recvmsg(socket, msg_p, flags);
417 }
418
419 /*
420  * Simulate a blocking sendmsg on the non-blocking socket.
421  * It's non blocking because it was set that way for recvmsg.
422  */
423 int
424 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
425 {
426     fd_set *sfds = (fd_set *) 0;
427     while (sendmsg(socket, msg_p, flags) == -1) {
428         int err;
429         if (rx_stats_active)
430             rx_atomic_inc(&rx_stats.sendSelects);
431
432         if (!sfds) {
433             if (!(sfds = IOMGR_AllocFDSet())) {
434                 (osi_Msg "rx failed to alloc fd_set: ");
435                 perror("rx_sendmsg");
436                 return -1;
437             }
438             FD_SET(socket, sfds);
439         }
440 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
441         while((rxi_HandleSocketError(socket)) > 0)
442           ;
443 #endif
444 #ifdef AFS_NT40_ENV
445         if (WSAGetLastError())
446 #elif defined(AFS_LINUX22_ENV)
447         /* linux unfortunately returns ECONNREFUSED if the target port
448          * is no longer in use */
449         /* and EAGAIN if a UDP checksum is incorrect */
450         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
451             && errno != EAGAIN)
452 #else
453         if (errno != EWOULDBLOCK && errno != ENOBUFS)
454 #endif
455         {
456             (osi_Msg "rx failed to send packet: ");
457             perror("rx_sendmsg");
458 #ifndef AFS_NT40_ENV
459             if (errno > 0)
460               return -errno;
461 #else
462             if (WSAGetLastError() > 0)
463               return -WSAGetLastError();
464 #endif
465             return -1;
466         }
467         while ((err = select(
468 #ifdef AFS_NT40_ENV
469                              0,
470 #else
471                              socket + 1,
472 #endif
473                              0, sfds, 0, 0)) != 1) {
474             if (err >= 0 || errno != EINTR)
475                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
476             FD_ZERO(sfds);
477             FD_SET(socket, sfds);
478         }
479     }
480     if (sfds)
481         IOMGR_FreeFDSet(sfds);
482     return 0;
483 }