prototypes-fixes-20020821
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 #include <afsconfig.h>
13 #include <afs/param.h>
14
15 RCSID("$Header$");
16
17 # include <sys/types.h>         /* fd_set on older platforms */
18 # include <errno.h>
19 # include <signal.h>
20 #ifdef AFS_NT40_ENV
21 # include <winsock2.h>
22 #else
23 # include <unistd.h>            /* select() prototype */
24 # include <sys/time.h>          /* struct timeval, select() prototype */
25 # ifndef FD_SET
26 #  include <sys/select.h>       /* fd_set on newer platforms */
27 # endif
28 # include <sys/socket.h>
29 # include <sys/file.h>
30 # include <netdb.h>
31 # include <sys/stat.h>
32 # include <netinet/in.h>
33 # include <net/if.h>
34 # include <sys/ioctl.h>
35 # include <sys/time.h>
36 #endif
37 # include "rx.h"
38 # include "rx_globals.h"
39 # include <lwp.h>
40
41 #define MAXTHREADNAMELENGTH 64
42
43 extern int (*registerProgram)();
44 extern int (*swapNameProgram)();
45
46 int debugSelectFailure; /* # of times select failed */
47
48 /*
49  * Sleep on the unique wait channel provided.
50  */
51 void rxi_Sleep(void *addr)
52 {
53     LWP_WaitProcess(addr);
54 }
55
56 /*
57  * Wakeup any threads on the channel provided.
58  * They may be woken up spuriously, and must check any conditions.
59  */
60 void rxi_Wakeup(void *addr)
61 {
62     LWP_NoYieldSignal(addr);
63 }
64
65 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
66 static void rx_ListenerProc(void *dummy);
67
68 /*
69  * Delay the current thread the specified number of seconds.
70  */
71 void rxi_Delay(int sec)
72 {
73     IOMGR_Sleep(sec);
74 }
75
76 static int quitListening = 0;
77
78 /* This routine will kill the listener thread, if it exists. */
79 void rxi_StopListener(void)
80 {
81     quitListening = 1;
82     rxi_ReScheduleEvents();
83 }
84
85 /* This routine will get called by the event package whenever a new,
86    earlier than others, event is posted.  If the Listener process
87    is blocked in selects, this will unblock it.  It also can be called
88    to force a new trip through the rxi_Listener select loop when the set
89    of file descriptors it should be listening to changes... */
90 void rxi_ReScheduleEvents(void)
91 {
92     if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
93 }
94
95 void rxi_InitializeThreadSupport(void)
96 {
97     PROCESS junk;
98
99     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
100     IOMGR_Initialize();
101     FD_ZERO(&rx_selectMask);
102 }
103
104 void rxi_StartServerProc(void (*proc)(void), int stacksize)
105 {
106     PROCESS scratchPid;
107     static int number = 0;
108     char name[32];
109
110     sprintf(name, "srv_%d", ++number);
111     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
112                       (void *) 0, "rx_ServerProc", &scratchPid);
113     if (registerProgram)
114         (*registerProgram)(scratchPid, name);
115 }
116
117 void rxi_StartListener(void)
118 {
119     /* Priority of listener should be high, so it can keep conns alive */
120 #define RX_LIST_STACK   24000
121     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 
122                       (void *) 0, "rx_Listener", &rx_listenerPid);
123     if (registerProgram)
124         (*registerProgram)(rx_listenerPid, "listener");
125 }
126
127 /* The main loop which listens to the net for datagrams, and handles timeouts
128    and retransmissions, etc.  It also is responsible for scheduling the
129    execution of pending events (in conjunction with event.c).
130
131    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
132    keeping up with the incoming stream of packets (because there are threads that
133    are interfering with its running sufficiently often), rx does a polling select
134    before doing a real IOMGR_Select system call.  Doing a real select means that
135    we don't have to let other processes run before processing more packets.
136
137    So, our algorithm is that if the last poll on the file descriptor found useful data, or
138    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
139    then we try the polling select before the IOMGR_Select.  If we eventually catch up
140    (which we can tell by the polling select returning no input packets ready), then we
141    don't do a polling select again until several seconds later (via nextPollTime mechanism).
142    */
143
144 static void rxi_ListenerProc(fd_set *rfds, int *tnop, struct rx_call **newcallp)
145 {
146     afs_uint32 host;
147     u_short port;
148     register struct rx_packet *p = (struct rx_packet *)0;
149     int socket;
150     struct clock cv;
151     afs_int32 nextPollTime;             /* time to next poll FD before sleeping */
152     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
153     struct timeval tv, *tvp;
154     int code;
155 #ifdef AFS_NT40_ENV
156     int i;
157 #endif
158     PROCESS pid;
159     char name[MAXTHREADNAMELENGTH] = "srv_0";
160
161     clock_NewTime();
162     lastPollWorked = 0;
163     nextPollTime = 0;
164     code = LWP_CurrentProcess(&pid);
165     if (code) {
166         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
167         exit(1);
168     }
169     rx_listenerPid = pid;
170     if (swapNameProgram)
171        (*swapNameProgram)(pid, "listener", &name);
172
173     for (;;) {
174         /* Grab a new packet only if necessary (otherwise re-use the old one) */
175         if (p) {
176             rxi_RestoreDataBufs(p);
177         }
178         else {
179             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
180                 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
181         }
182         /* Wait for the next event time or a packet to arrive. */
183         /* event_RaiseEvents schedules any events whose time has come and
184            then atomically computes the time to the next event, guaranteeing
185            that this is positive.  If there is no next event, it returns 0 */
186         clock_NewTime();
187         if (!rxevent_RaiseEvents(&cv)) tvp = NULL;
188         else {
189             /* It's important to copy cv to tv, because the 4.3 documentation
190                for select threatens that *tv may be updated after a select, in
191                future editions of the system, to indicate how much of the time
192                period has elapsed.  So we shouldn't rely on tv not being altered. */
193             tv.tv_sec = cv.sec; /* Time to next event */
194             tv.tv_usec = cv.usec;
195             tvp = &tv;
196         }       
197         rx_stats.selects++;
198
199         *rfds = rx_selectMask;
200
201         if (lastPollWorked || nextPollTime < clock_Sec()) {
202             /* we're catching up, or haven't tried to for a few seconds */
203             doingPoll = 1;
204             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
205             tv.tv_sec = tv.tv_usec = 0;         /* make sure we poll */
206             tvp = &tv;
207             code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
208         }
209         else {
210             doingPoll = 0;
211             code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
212         }
213         lastPollWorked = 0;     /* default is that it didn't find anything */
214
215         if (quitListening) {
216             quitListening = 0;
217             LWP_DestroyProcess(pid);
218         }
219
220         switch(code) {
221             case 0: 
222                 /* Timer interrupt:
223                  * If it was a timer interrupt then we can assume that
224                  * the time has advanced by roughly the value of the
225                  * previous timeout, and that there is now at least
226                  * one pending event.
227                  */
228                 clock_NewTime();
229                 break;
230             case -1: 
231                 /* select or IOMGR_Select returned failure */
232                 debugSelectFailure++;   /* update debugging counter */
233                 clock_NewTime();
234                 break;
235             case -2:
236                 /* IOMGR_Cancel:
237                  * IOMGR_Cancel is invoked whenever a new event is
238                  * posted that is earlier than any existing events.
239                  * So we re-evaluate the time, and then go back to
240                  * reschedule events
241                  */
242                 clock_NewTime();
243                 break;
244
245             default: 
246                 /* Packets have arrived, presumably:
247                  * If it wasn't a timer interrupt, then no event should have
248                  * timed out yet (well some event may have, but only just...), so
249                  * we don't bother looking to see if any have timed out, but just
250                  * go directly to reading the data packets
251                  */
252                 clock_NewTime();
253                 if (doingPoll) lastPollWorked = 1;
254 #ifdef AFS_NT40_ENV
255                 for (i=0; p && i<rfds->fd_count; i++) {
256                     socket = rfds->fd_array[i];
257                     if (rxi_ReadPacket(socket, p, &host, &port)) {
258                         *newcallp = NULL;
259                         p = rxi_ReceivePacket(p, socket, host, port,
260                                               tnop, newcallp);
261                         if (newcallp && *newcallp) {
262                             if (p) {
263                                 rxi_FreePacket(p);
264                             }
265                             if (swapNameProgram) {
266                                 (*swapNameProgram)(rx_listenerPid, &name, 0);
267                                 rx_listenerPid = 0;
268                             }
269                             return;
270                         }
271                     }
272                 }
273 #else
274                 for (socket = rx_minSocketNumber;
275                      p && socket <= rx_maxSocketNumber; socket++)  {
276                      if (!FD_ISSET(socket, rfds))
277                          continue;
278                      if (rxi_ReadPacket(socket, p, &host, &port)) {
279                          p = rxi_ReceivePacket(p, socket, host, port,
280                                                tnop, newcallp);
281                         if (newcallp && *newcallp) {
282                             if (p) {
283                                 rxi_FreePacket(p);
284                             }
285                             if (swapNameProgram) {
286                                 (*swapNameProgram)(rx_listenerPid, &name, 0);
287                                 rx_listenerPid = 0;
288                             }
289                             return;
290                         }
291                     }
292                 }
293 #endif
294                 break;
295         }
296     }
297     /* NOTREACHED */
298 }
299
300 /* This is the listener process request loop. The listener process loop
301  * becomes a server thread when rxi_ListenerProc returns, and stays
302  * server thread until rxi_ServerProc returns. */
303 static void rx_ListenerProc(void *dummy)
304 {
305     int threadID;
306     int sock;
307     struct rx_call *newcall;
308     fd_set *rfds;
309
310     if (!(rfds = IOMGR_AllocFDSet())) {
311         osi_Panic("rx_ListenerProc: no fd_sets!\n");
312     }
313
314     while(1) {
315         newcall = NULL;
316         threadID = -1;
317         rxi_ListenerProc(rfds, &threadID, &newcall);
318         /* assert(threadID != -1); */
319         /* assert(newcall != NULL); */
320         sock = OSI_NULLSOCKET;
321         rxi_ServerProc(threadID, newcall, &sock);
322         /* assert(sock != OSI_NULLSOCKET); */
323     }
324     /* not reached */
325 }
326
327 /* This is the server process request loop. The server process loop
328  * becomes a listener thread when rxi_ServerProc returns, and stays
329  * listener thread until rxi_ListenerProc returns. */
330 void rx_ServerProc(void)
331 {
332     int sock;
333     int threadID;
334     struct rx_call *newcall = NULL;
335     fd_set *rfds;
336
337     if (!(rfds = IOMGR_AllocFDSet())) {
338         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
339     }
340
341     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
342     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
343     /* threadID is used for making decisions in GetCall.  Get it by bumping
344      * number of threads handling incoming calls */
345     threadID = rxi_availProcs++;
346
347     while(1) {
348         sock = OSI_NULLSOCKET;
349         rxi_ServerProc(threadID, newcall, &sock);
350         /* assert(sock != OSI_NULLSOCKET); */
351         newcall = NULL;
352         rxi_ListenerProc(rfds, &threadID, &newcall);
353         /* assert(threadID != -1); */
354         /* assert(newcall != NULL); */
355     }
356     /* not reached */
357 }
358
359 /*
360  * Called from GetUDPSocket.
361  * Called from a single thread at startup.
362  * Returns 0 on success; -1 on failure.
363  */
364 int rxi_Listen(osi_socket sock)
365 {
366 #ifndef AFS_NT40_ENV
367     /*
368      * Put the socket into non-blocking mode so that rx_Listener
369      * can do a polling read before entering select
370      */
371 #ifndef AFS_DJGPP_ENV
372     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
373         perror("fcntl");
374         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
375         return -1;
376     }
377 #else
378     if ( __djgpp_set_socket_blocking_mode(sock, 1) < 0 ) {
379       perror("__djgpp_set_socket_blocking_mode");
380       (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
381       return -1;
382     }
383 #endif /* AFS_DJGPP_ENV */
384
385     if (sock > FD_SETSIZE-1) {
386         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
387          FD_SETSIZE-1);
388         return -1;
389     }
390 #endif
391
392     FD_SET(sock, &rx_selectMask);
393     if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
394     if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
395     return 0;
396 }
397
398 /*
399  * Recvmsg
400  */
401 int rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
402 {
403     return recvmsg((int) socket, msg_p, flags);
404 }
405
406 /*
407  * Simulate a blocking sendmsg on the non-blocking socket.
408  * It's non blocking because it was set that way for recvmsg.
409  */
410 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
411 {
412     fd_set *sfds = (fd_set*)0;
413     while (sendmsg(socket, msg_p, flags) == -1) {
414         int err;
415         rx_stats.sendSelects++;
416
417         if (!sfds) {
418             if ( !(sfds = IOMGR_AllocFDSet())) {
419                 (osi_Msg "rx failed to alloc fd_set: ");
420                 perror("rx_sendmsg");
421                 return 3;
422             }
423             FD_SET(socket, sfds);
424         }
425             
426 #ifdef AFS_NT40_ENV
427         if (errno)
428 #elif defined(AFS_LINUX22_ENV)
429           /* linux unfortunately returns ECONNREFUSED if the target port
430            * is no longer in use */
431           /* and EAGAIN if a UDP checksum is incorrect */
432         if (errno != EWOULDBLOCK && errno != ENOBUFS &&
433             errno != ECONNREFUSED && errno != EAGAIN)
434 #else
435         if (errno != EWOULDBLOCK && errno != ENOBUFS)
436 #endif
437         {
438             (osi_Msg "rx failed to send packet: ");
439             perror("rx_sendmsg");
440             return 3;
441         }
442         while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
443             if (err >= 0 || errno != EINTR) 
444               osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
445         }
446     }
447     if (sfds)
448         IOMGR_FreeFDSet(sfds);
449     return 0;
450 }