rx-prototypes-20080924
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 RCSID
25     ("$Header$");
26
27 # include <sys/types.h>         /* fd_set on older platforms */
28 # include <errno.h>
29 # include <signal.h>
30 #ifdef AFS_NT40_ENV
31 # include <winsock2.h>
32 #else
33 # include <unistd.h>            /* select() prototype */
34 # include <sys/time.h>          /* struct timeval, select() prototype */
35 # ifndef FD_SET
36 #  include <sys/select.h>       /* fd_set on newer platforms */
37 # endif
38 # include <sys/socket.h>
39 # include <sys/file.h>
40 # include <netdb.h>
41 # include <sys/stat.h>
42 # include <netinet/in.h>
43 # include <net/if.h>
44 # include <sys/ioctl.h>
45 # include <sys/time.h>
46 #endif
47 # include "rx.h"
48 # include "rx_globals.h"
49 # include <lwp.h>
50
51 #define MAXTHREADNAMELENGTH 64
52
53 int debugSelectFailure;         /* # of times select failed */
54
55 /*
56  * Sleep on the unique wait channel provided.
57  */
58 void
59 rxi_Sleep(void *addr)
60 {
61     LWP_WaitProcess(addr);
62 }
63
64 /*
65  * Wakeup any threads on the channel provided.
66  * They may be woken up spuriously, and must check any conditions.
67  */
68 void
69 rxi_Wakeup(void *addr)
70 {
71     LWP_NoYieldSignal(addr);
72 }
73
74 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
75 static void* rx_ListenerProc(void *dummy);
76
77 /*
78  * Delay the current thread the specified number of seconds.
79  */
80 void
81 rxi_Delay(int sec)
82 {
83     IOMGR_Sleep(sec);
84 }
85
86 static int quitListening = 0;
87
88 /* This routine will kill the listener thread, if it exists. */
89 void
90 rxi_StopListener(void)
91 {
92     quitListening = 1;
93     rxi_ReScheduleEvents();
94 }
95
96 /* This routine will get called by the event package whenever a new,
97    earlier than others, event is posted.  If the Listener process
98    is blocked in selects, this will unblock it.  It also can be called
99    to force a new trip through the rxi_Listener select loop when the set
100    of file descriptors it should be listening to changes... */
101 void
102 rxi_ReScheduleEvents(void)
103 {
104     if (rx_listenerPid)
105         IOMGR_Cancel(rx_listenerPid);
106 }
107
108 void
109 rxi_InitializeThreadSupport(void)
110 {
111     PROCESS junk;
112
113     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
114     IOMGR_Initialize();
115     FD_ZERO(&rx_selectMask);
116 }
117
118 void
119 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
120 {
121     PROCESS scratchPid;
122     static int number = 0;
123     char name[32];
124
125     sprintf(name, "srv_%d", ++number);
126     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
127                       "rx_ServerProc", &scratchPid);
128     if (registerProgram)
129         (*registerProgram) (scratchPid, name);
130 }
131
132 void
133 rxi_StartListener(void)
134 {
135     /* Priority of listener should be high, so it can keep conns alive */
136 #define RX_LIST_STACK   24000
137     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
138                       NULL, "rx_Listener", &rx_listenerPid);
139     if (registerProgram)
140         (*registerProgram) (rx_listenerPid, "listener");
141 }
142
143 /* The main loop which listens to the net for datagrams, and handles timeouts
144    and retransmissions, etc.  It also is responsible for scheduling the
145    execution of pending events (in conjunction with event.c).
146
147    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
148    keeping up with the incoming stream of packets (because there are threads that
149    are interfering with its running sufficiently often), rx does a polling select
150    before doing a real IOMGR_Select system call.  Doing a real select means that
151    we don't have to let other processes run before processing more packets.
152
153    So, our algorithm is that if the last poll on the file descriptor found useful data, or
154    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
155    then we try the polling select before the IOMGR_Select.  If we eventually catch up
156    (which we can tell by the polling select returning no input packets ready), then we
157    don't do a polling select again until several seconds later (via nextPollTime mechanism).
158    */
159
160 static void
161 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
162 {
163     afs_uint32 host;
164     u_short port;
165     register struct rx_packet *p = (struct rx_packet *)0;
166     osi_socket socket;
167     struct clock cv;
168     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
169     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
170     struct timeval tv, *tvp;
171     int code;
172 #ifdef AFS_NT40_ENV
173     int i;
174 #endif
175     PROCESS pid;
176     char name[MAXTHREADNAMELENGTH] = "srv_0";
177
178     clock_NewTime();
179     lastPollWorked = 0;
180     nextPollTime = 0;
181     code = LWP_CurrentProcess(&pid);
182     if (code) {
183         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
184         exit(1);
185     }
186     rx_listenerPid = pid;
187     if (swapNameProgram)
188         (*swapNameProgram) (pid, "listener", &name[0]);
189
190     for (;;) {
191         /* Grab a new packet only if necessary (otherwise re-use the old one) */
192         if (p) {
193             rxi_RestoreDataBufs(p);
194         } else {
195             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
196                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
197         }
198         /* Wait for the next event time or a packet to arrive. */
199         /* event_RaiseEvents schedules any events whose time has come and
200          * then atomically computes the time to the next event, guaranteeing
201          * that this is positive.  If there is no next event, it returns 0 */
202         clock_NewTime();
203         if (!rxevent_RaiseEvents(&cv))
204             tvp = NULL;
205         else {
206             /* It's important to copy cv to tv, because the 4.3 documentation
207              * for select threatens that *tv may be updated after a select, in
208              * future editions of the system, to indicate how much of the time
209              * period has elapsed.  So we shouldn't rely on tv not being altered. */
210             tv.tv_sec = cv.sec; /* Time to next event */
211             tv.tv_usec = cv.usec;
212             tvp = &tv;
213         }
214         rx_stats.selects++;
215
216         *rfds = rx_selectMask;
217
218         if (lastPollWorked || nextPollTime < clock_Sec()) {
219             /* we're catching up, or haven't tried to for a few seconds */
220             doingPoll = 1;
221             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
222             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
223             tvp = &tv;
224             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
225         } else {
226             doingPoll = 0;
227             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
228         }
229         lastPollWorked = 0;     /* default is that it didn't find anything */
230
231         if (quitListening) {
232             quitListening = 0;
233             LWP_DestroyProcess(pid);
234         }
235
236         switch (code) {
237         case 0:
238             /* Timer interrupt:
239              * If it was a timer interrupt then we can assume that
240              * the time has advanced by roughly the value of the
241              * previous timeout, and that there is now at least
242              * one pending event.
243              */
244             clock_NewTime();
245             break;
246         case -1:
247             /* select or IOMGR_Select returned failure */
248             debugSelectFailure++;       /* update debugging counter */
249             clock_NewTime();
250             break;
251         case -2:
252             /* IOMGR_Cancel:
253              * IOMGR_Cancel is invoked whenever a new event is
254              * posted that is earlier than any existing events.
255              * So we re-evaluate the time, and then go back to
256              * reschedule events
257              */
258             clock_NewTime();
259             break;
260
261         default:
262             /* Packets have arrived, presumably:
263              * If it wasn't a timer interrupt, then no event should have
264              * timed out yet (well some event may have, but only just...), so
265              * we don't bother looking to see if any have timed out, but just
266              * go directly to reading the data packets
267              */
268             clock_NewTime();
269             if (doingPoll)
270                 lastPollWorked = 1;
271 #ifdef AFS_NT40_ENV
272             for (i = 0; p && i < rfds->fd_count; i++) {
273                 socket = rfds->fd_array[i];
274                 if (rxi_ReadPacket(socket, p, &host, &port)) {
275                     *newcallp = NULL;
276                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
277                                           newcallp);
278                     if (newcallp && *newcallp) {
279                         if (p) {
280                             rxi_FreePacket(p);
281                         }
282                         if (swapNameProgram) {
283                             (*swapNameProgram) (rx_listenerPid, name, 0);
284                             rx_listenerPid = 0;
285                         }
286                         return;
287                     }
288                 }
289             }
290 #else
291             for (socket = rx_minSocketNumber;
292                  p && socket <= rx_maxSocketNumber; socket++) {
293                 if (!FD_ISSET(socket, rfds))
294                     continue;
295                 if (rxi_ReadPacket(socket, p, &host, &port)) {
296                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
297                                           newcallp);
298                     if (newcallp && *newcallp) {
299                         if (p) {
300                             rxi_FreePacket(p);
301                         }
302                         if (swapNameProgram) {
303                             (*swapNameProgram) (rx_listenerPid, name, 0);
304                             rx_listenerPid = 0;
305                         }
306                         return;
307                     }
308                 }
309             }
310 #endif
311             break;
312         }
313     }
314     /* NOTREACHED */
315 }
316
317 /* This is the listener process request loop. The listener process loop
318  * becomes a server thread when rxi_ListenerProc returns, and stays
319  * server thread until rxi_ServerProc returns. */
320 static void *
321 rx_ListenerProc(void *dummy)
322 {
323     int threadID;
324     osi_socket sock;
325     struct rx_call *newcall;
326     fd_set *rfds;
327
328     if (!(rfds = IOMGR_AllocFDSet())) {
329         osi_Panic("rx_ListenerProc: no fd_sets!\n");
330     }
331
332     while (1) {
333         newcall = NULL;
334         threadID = -1;
335         rxi_ListenerProc(rfds, &threadID, &newcall);
336         /* assert(threadID != -1); */
337         /* assert(newcall != NULL); */
338         sock = OSI_NULLSOCKET;
339         rxi_ServerProc(threadID, newcall, &sock);
340         /* assert(sock != OSI_NULLSOCKET); */
341     }
342     /* not reached */
343     return NULL;
344 }
345
346 /* This is the server process request loop. The server process loop
347  * becomes a listener thread when rxi_ServerProc returns, and stays
348  * listener thread until rxi_ListenerProc returns. */
349 void *
350 rx_ServerProc(void * unused)
351 {
352     osi_socket sock;
353     int threadID;
354     struct rx_call *newcall = NULL;
355     fd_set *rfds;
356
357     if (!(rfds = IOMGR_AllocFDSet())) {
358         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
359     }
360
361     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
362     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
363     /* threadID is used for making decisions in GetCall.  Get it by bumping
364      * number of threads handling incoming calls */
365     threadID = rxi_availProcs++;
366
367     while (1) {
368         sock = OSI_NULLSOCKET;
369         rxi_ServerProc(threadID, newcall, &sock);
370         /* assert(sock != OSI_NULLSOCKET); */
371         newcall = NULL;
372         rxi_ListenerProc(rfds, &threadID, &newcall);
373         /* assert(threadID != -1); */
374         /* assert(newcall != NULL); */
375     }
376     /* not reached */
377     return NULL;
378 }
379
380 /*
381  * Called from GetUDPSocket.
382  * Called from a single thread at startup.
383  * Returns 0 on success; -1 on failure.
384  */
385 int
386 rxi_Listen(osi_socket sock)
387 {
388 #ifndef AFS_NT40_ENV
389     /*
390      * Put the socket into non-blocking mode so that rx_Listener
391      * can do a polling read before entering select
392      */
393     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
394         perror("fcntl");
395         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
396         return -1;
397     }
398
399     if (sock > FD_SETSIZE - 1) {
400         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
401          FD_SETSIZE - 1);
402         return -1;
403     }
404 #endif
405
406     FD_SET(sock, &rx_selectMask);
407     if (sock > rx_maxSocketNumber)
408         rx_maxSocketNumber = sock;
409     if (sock < rx_minSocketNumber)
410         rx_minSocketNumber = sock;
411     return 0;
412 }
413
414 /*
415  * Recvmsg
416  */
417 int
418 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
419 {
420 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
421     while((rxi_HandleSocketError(socket)) > 0)
422         ;
423 #endif
424     return recvmsg(socket, msg_p, flags);
425 }
426
427 /*
428  * Simulate a blocking sendmsg on the non-blocking socket.
429  * It's non blocking because it was set that way for recvmsg.
430  */
431 int
432 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
433 {
434     fd_set *sfds = (fd_set *) 0;
435     while (sendmsg(socket, msg_p, flags) == -1) {
436         int err;
437         rx_stats.sendSelects++;
438
439         if (!sfds) {
440             if (!(sfds = IOMGR_AllocFDSet())) {
441                 (osi_Msg "rx failed to alloc fd_set: ");
442                 perror("rx_sendmsg");
443                 return -1;
444             }
445             FD_SET(socket, sfds);
446         }
447 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
448         while((rxi_HandleSocketError(socket)) > 0)
449           ;
450 #endif
451 #ifdef AFS_NT40_ENV
452         if (WSAGetLastError())
453 #elif defined(AFS_LINUX22_ENV)
454         /* linux unfortunately returns ECONNREFUSED if the target port
455          * is no longer in use */
456         /* and EAGAIN if a UDP checksum is incorrect */
457         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
458             && errno != EAGAIN)
459 #else
460         if (errno != EWOULDBLOCK && errno != ENOBUFS)
461 #endif
462         {
463             (osi_Msg "rx failed to send packet: ");
464             perror("rx_sendmsg");
465             return -1;
466         }
467         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
468             if (err >= 0 || errno != EINTR)
469                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
470         }
471     }
472     if (sfds)
473         IOMGR_FreeFDSet(sfds);
474     return 0;
475 }