d0fe8f62cbf05c7dab8bbf34d6b23a835dfa8def
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 #include <afsconfig.h>
13 #include <afs/param.h>
14
15 RCSID
16     ("$Header$");
17
18 # include <sys/types.h>         /* fd_set on older platforms */
19 # include <errno.h>
20 # include <signal.h>
21 #ifdef AFS_NT40_ENV
22 # include <winsock2.h>
23 #else
24 # include <unistd.h>            /* select() prototype */
25 # include <sys/time.h>          /* struct timeval, select() prototype */
26 # ifndef FD_SET
27 #  include <sys/select.h>       /* fd_set on newer platforms */
28 # endif
29 # include <sys/socket.h>
30 # include <sys/file.h>
31 # include <netdb.h>
32 # include <sys/stat.h>
33 # include <netinet/in.h>
34 # include <net/if.h>
35 # include <sys/ioctl.h>
36 # include <sys/time.h>
37 #endif
38 # include "rx.h"
39 # include "rx_globals.h"
40 # include <lwp.h>
41
42 #define MAXTHREADNAMELENGTH 64
43
44 extern int (*registerProgram) ();
45 extern int (*swapNameProgram) ();
46
47 int debugSelectFailure;         /* # of times select failed */
48
49 /*
50  * Sleep on the unique wait channel provided.
51  */
52 void
53 rxi_Sleep(void *addr)
54 {
55     LWP_WaitProcess(addr);
56 }
57
58 /*
59  * Wakeup any threads on the channel provided.
60  * They may be woken up spuriously, and must check any conditions.
61  */
62 void
63 rxi_Wakeup(void *addr)
64 {
65     LWP_NoYieldSignal(addr);
66 }
67
68 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
69 static void rx_ListenerProc(void *dummy);
70
71 /*
72  * Delay the current thread the specified number of seconds.
73  */
74 void
75 rxi_Delay(int sec)
76 {
77     IOMGR_Sleep(sec);
78 }
79
80 static int quitListening = 0;
81
82 /* This routine will kill the listener thread, if it exists. */
83 void
84 rxi_StopListener(void)
85 {
86     quitListening = 1;
87     rxi_ReScheduleEvents();
88 }
89
90 /* This routine will get called by the event package whenever a new,
91    earlier than others, event is posted.  If the Listener process
92    is blocked in selects, this will unblock it.  It also can be called
93    to force a new trip through the rxi_Listener select loop when the set
94    of file descriptors it should be listening to changes... */
95 void
96 rxi_ReScheduleEvents(void)
97 {
98     if (rx_listenerPid)
99         IOMGR_Cancel(rx_listenerPid);
100 }
101
102 void
103 rxi_InitializeThreadSupport(void)
104 {
105     PROCESS junk;
106
107     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
108     IOMGR_Initialize();
109     FD_ZERO(&rx_selectMask);
110 }
111
112 void
113 rxi_StartServerProc(void (*proc) (void), int stacksize)
114 {
115     PROCESS scratchPid;
116     static int number = 0;
117     char name[32];
118
119     sprintf(name, "srv_%d", ++number);
120     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, (void *)0,
121                       "rx_ServerProc", &scratchPid);
122     if (registerProgram)
123         (*registerProgram) (scratchPid, name);
124 }
125
126 void
127 rxi_StartListener(void)
128 {
129     /* Priority of listener should be high, so it can keep conns alive */
130 #define RX_LIST_STACK   24000
131     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
132                       (void *)0, "rx_Listener", &rx_listenerPid);
133     if (registerProgram)
134         (*registerProgram) (rx_listenerPid, "listener");
135 }
136
137 /* The main loop which listens to the net for datagrams, and handles timeouts
138    and retransmissions, etc.  It also is responsible for scheduling the
139    execution of pending events (in conjunction with event.c).
140
141    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
142    keeping up with the incoming stream of packets (because there are threads that
143    are interfering with its running sufficiently often), rx does a polling select
144    before doing a real IOMGR_Select system call.  Doing a real select means that
145    we don't have to let other processes run before processing more packets.
146
147    So, our algorithm is that if the last poll on the file descriptor found useful data, or
148    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
149    then we try the polling select before the IOMGR_Select.  If we eventually catch up
150    (which we can tell by the polling select returning no input packets ready), then we
151    don't do a polling select again until several seconds later (via nextPollTime mechanism).
152    */
153
154 static void
155 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
156 {
157     afs_uint32 host;
158     u_short port;
159     register struct rx_packet *p = (struct rx_packet *)0;
160     int socket;
161     struct clock cv;
162     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
163     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
164     struct timeval tv, *tvp;
165     int code;
166 #ifdef AFS_NT40_ENV
167     int i;
168 #endif
169     PROCESS pid;
170     char name[MAXTHREADNAMELENGTH] = "srv_0";
171
172     clock_NewTime();
173     lastPollWorked = 0;
174     nextPollTime = 0;
175     code = LWP_CurrentProcess(&pid);
176     if (code) {
177         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
178         exit(1);
179     }
180     rx_listenerPid = pid;
181     if (swapNameProgram)
182         (*swapNameProgram) (pid, "listener", &name);
183
184     for (;;) {
185         /* Grab a new packet only if necessary (otherwise re-use the old one) */
186         if (p) {
187             rxi_RestoreDataBufs(p);
188         } else {
189             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
190                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
191         }
192         /* Wait for the next event time or a packet to arrive. */
193         /* event_RaiseEvents schedules any events whose time has come and
194          * then atomically computes the time to the next event, guaranteeing
195          * that this is positive.  If there is no next event, it returns 0 */
196         clock_NewTime();
197         if (!rxevent_RaiseEvents(&cv))
198             tvp = NULL;
199         else {
200             /* It's important to copy cv to tv, because the 4.3 documentation
201              * for select threatens that *tv may be updated after a select, in
202              * future editions of the system, to indicate how much of the time
203              * period has elapsed.  So we shouldn't rely on tv not being altered. */
204             tv.tv_sec = cv.sec; /* Time to next event */
205             tv.tv_usec = cv.usec;
206             tvp = &tv;
207         }
208         rx_stats.selects++;
209
210         *rfds = rx_selectMask;
211
212         if (lastPollWorked || nextPollTime < clock_Sec()) {
213             /* we're catching up, or haven't tried to for a few seconds */
214             doingPoll = 1;
215             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
216             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
217             tvp = &tv;
218             code = select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
219         } else {
220             doingPoll = 0;
221             code = IOMGR_Select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
222         }
223         lastPollWorked = 0;     /* default is that it didn't find anything */
224
225         if (quitListening) {
226             quitListening = 0;
227             LWP_DestroyProcess(pid);
228         }
229
230         switch (code) {
231         case 0:
232             /* Timer interrupt:
233              * If it was a timer interrupt then we can assume that
234              * the time has advanced by roughly the value of the
235              * previous timeout, and that there is now at least
236              * one pending event.
237              */
238             clock_NewTime();
239             break;
240         case -1:
241             /* select or IOMGR_Select returned failure */
242             debugSelectFailure++;       /* update debugging counter */
243             clock_NewTime();
244             break;
245         case -2:
246             /* IOMGR_Cancel:
247              * IOMGR_Cancel is invoked whenever a new event is
248              * posted that is earlier than any existing events.
249              * So we re-evaluate the time, and then go back to
250              * reschedule events
251              */
252             clock_NewTime();
253             break;
254
255         default:
256             /* Packets have arrived, presumably:
257              * If it wasn't a timer interrupt, then no event should have
258              * timed out yet (well some event may have, but only just...), so
259              * we don't bother looking to see if any have timed out, but just
260              * go directly to reading the data packets
261              */
262             clock_NewTime();
263             if (doingPoll)
264                 lastPollWorked = 1;
265 #ifdef AFS_NT40_ENV
266             for (i = 0; p && i < rfds->fd_count; i++) {
267                 socket = rfds->fd_array[i];
268                 if (rxi_ReadPacket(socket, p, &host, &port)) {
269                     *newcallp = NULL;
270                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
271                                           newcallp);
272                     if (newcallp && *newcallp) {
273                         if (p) {
274                             rxi_FreePacket(p);
275                         }
276                         if (swapNameProgram) {
277                             (*swapNameProgram) (rx_listenerPid, &name, 0);
278                             rx_listenerPid = 0;
279                         }
280                         return;
281                     }
282                 }
283             }
284 #else
285             for (socket = rx_minSocketNumber;
286                  p && socket <= rx_maxSocketNumber; socket++) {
287                 if (!FD_ISSET(socket, rfds))
288                     continue;
289                 if (rxi_ReadPacket(socket, p, &host, &port)) {
290                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
291                                           newcallp);
292                     if (newcallp && *newcallp) {
293                         if (p) {
294                             rxi_FreePacket(p);
295                         }
296                         if (swapNameProgram) {
297                             (*swapNameProgram) (rx_listenerPid, &name, 0);
298                             rx_listenerPid = 0;
299                         }
300                         return;
301                     }
302                 }
303             }
304 #endif
305             break;
306         }
307     }
308     /* NOTREACHED */
309 }
310
311 /* This is the listener process request loop. The listener process loop
312  * becomes a server thread when rxi_ListenerProc returns, and stays
313  * server thread until rxi_ServerProc returns. */
314 static void
315 rx_ListenerProc(void *dummy)
316 {
317     int threadID;
318     int sock;
319     struct rx_call *newcall;
320     fd_set *rfds;
321
322     if (!(rfds = IOMGR_AllocFDSet())) {
323         osi_Panic("rx_ListenerProc: no fd_sets!\n");
324     }
325
326     while (1) {
327         newcall = NULL;
328         threadID = -1;
329         rxi_ListenerProc(rfds, &threadID, &newcall);
330         /* assert(threadID != -1); */
331         /* assert(newcall != NULL); */
332         sock = OSI_NULLSOCKET;
333         rxi_ServerProc(threadID, newcall, &sock);
334         /* assert(sock != OSI_NULLSOCKET); */
335     }
336     /* not reached */
337 }
338
339 /* This is the server process request loop. The server process loop
340  * becomes a listener thread when rxi_ServerProc returns, and stays
341  * listener thread until rxi_ListenerProc returns. */
342 void
343 rx_ServerProc(void)
344 {
345     int sock;
346     int threadID;
347     struct rx_call *newcall = NULL;
348     fd_set *rfds;
349
350     if (!(rfds = IOMGR_AllocFDSet())) {
351         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
352     }
353
354     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
355     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
356     /* threadID is used for making decisions in GetCall.  Get it by bumping
357      * number of threads handling incoming calls */
358     threadID = rxi_availProcs++;
359
360     while (1) {
361         sock = OSI_NULLSOCKET;
362         rxi_ServerProc(threadID, newcall, &sock);
363         /* assert(sock != OSI_NULLSOCKET); */
364         newcall = NULL;
365         rxi_ListenerProc(rfds, &threadID, &newcall);
366         /* assert(threadID != -1); */
367         /* assert(newcall != NULL); */
368     }
369     /* not reached */
370 }
371
372 /*
373  * Called from GetUDPSocket.
374  * Called from a single thread at startup.
375  * Returns 0 on success; -1 on failure.
376  */
377 int
378 rxi_Listen(osi_socket sock)
379 {
380 #ifndef AFS_NT40_ENV
381     /*
382      * Put the socket into non-blocking mode so that rx_Listener
383      * can do a polling read before entering select
384      */
385 #ifndef AFS_DJGPP_ENV
386     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
387         perror("fcntl");
388         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
389         return -1;
390     }
391 #else
392     if (__djgpp_set_socket_blocking_mode(sock, 1) < 0) {
393         perror("__djgpp_set_socket_blocking_mode");
394         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
395         return -1;
396     }
397 #endif /* AFS_DJGPP_ENV */
398
399     if (sock > FD_SETSIZE - 1) {
400         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
401          FD_SETSIZE - 1);
402         return -1;
403     }
404 #endif
405
406     FD_SET(sock, &rx_selectMask);
407     if (sock > rx_maxSocketNumber)
408         rx_maxSocketNumber = sock;
409     if (sock < rx_minSocketNumber)
410         rx_minSocketNumber = sock;
411     return 0;
412 }
413
414 /*
415  * Recvmsg
416  */
417 int
418 rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
419 {
420     return recvmsg((int)socket, msg_p, flags);
421 }
422
423 /*
424  * Simulate a blocking sendmsg on the non-blocking socket.
425  * It's non blocking because it was set that way for recvmsg.
426  */
427 int
428 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
429 {
430     fd_set *sfds = (fd_set *) 0;
431     while (sendmsg(socket, msg_p, flags) == -1) {
432         int err;
433         rx_stats.sendSelects++;
434
435         if (!sfds) {
436             if (!(sfds = IOMGR_AllocFDSet())) {
437                 (osi_Msg "rx failed to alloc fd_set: ");
438                 perror("rx_sendmsg");
439                 return 3;
440             }
441             FD_SET(socket, sfds);
442         }
443 #ifdef AFS_NT40_ENV
444         if (errno)
445 #elif defined(AFS_LINUX22_ENV)
446         /* linux unfortunately returns ECONNREFUSED if the target port
447          * is no longer in use */
448         /* and EAGAIN if a UDP checksum is incorrect */
449         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
450             && errno != EAGAIN)
451 #else
452         if (errno != EWOULDBLOCK && errno != ENOBUFS)
453 #endif
454         {
455             (osi_Msg "rx failed to send packet: ");
456             perror("rx_sendmsg");
457             return 3;
458         }
459         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
460             if (err >= 0 || errno != EINTR)
461                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
462         }
463     }
464     if (sfds)
465         IOMGR_FreeFDSet(sfds);
466     return 0;
467 }