include-afsconfig-before-param-h-20010712
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 #include <afsconfig.h>
13 #include <afs/param.h>
14
15 RCSID("$Header$");
16
17 # include <sys/types.h>         /* fd_set on older platforms */
18 # include <errno.h>
19 # include <signal.h>
20 #ifdef AFS_NT40_ENV
21 # include <winsock2.h>
22 #else
23 # include <unistd.h>            /* select() prototype */
24 # include <sys/time.h>          /* struct timeval, select() prototype */
25 # ifndef FD_SET
26 #  include <sys/select.h>       /* fd_set on newer platforms */
27 # endif
28 # include <sys/socket.h>
29 # include <sys/file.h>
30 # include <netdb.h>
31 # include <sys/stat.h>
32 # include <netinet/in.h>
33 # include <net/if.h>
34 # include <sys/ioctl.h>
35 # include <sys/time.h>
36 #endif
37 # include "rx.h"
38 # include "rx_globals.h"
39 # include <lwp.h>
40
41 #define MAXTHREADNAMELENGTH 64
42
43 extern int (*registerProgram)();
44 extern int (*swapNameProgram)();
45
46 int debugSelectFailure; /* # of times select failed */
47 /*
48  * Sleep on the unique wait channel provided.
49  */
50 void rxi_Sleep(void *addr)
51 {
52     LWP_WaitProcess(addr);
53 }
54
55 /*
56  * Wakeup any threads on the channel provided.
57  * They may be woken up spuriously, and must check any conditions.
58  */
59 void rxi_Wakeup(void *addr)
60 {
61     LWP_NoYieldSignal(addr);
62 }
63
64 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
65 static void rx_ListenerProc(void *dummy);
66
67 /*
68  * Delay the current thread the specified number of seconds.
69  */
70 void rxi_Delay(int sec)
71 {
72     IOMGR_Sleep(sec);
73 }
74
75 static int quitListening = 0;
76
77 /* This routine will kill the listener thread, if it exists. */
78 void
79 rxi_StopListener()
80 {
81     quitListening = 1;
82     rxi_ReScheduleEvents();
83 }
84
85 /* This routine will get called by the event package whenever a new,
86    earlier than others, event is posted.  If the Listener process
87    is blocked in selects, this will unblock it.  It also can be called
88    to force a new trip through the rxi_Listener select loop when the set
89    of file descriptors it should be listening to changes... */
90 void rxi_ReScheduleEvents() {
91     if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
92 }
93
94 void rxi_InitializeThreadSupport(void)
95 {
96     PROCESS junk;
97
98     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
99     IOMGR_Initialize();
100     FD_ZERO(&rx_selectMask);
101 }
102
103 void rxi_StartServerProc(proc, stacksize)
104     long (*proc)();
105 {
106     PROCESS scratchPid;
107     static int number = 0;
108     char name[32];
109
110     sprintf(name, "srv_%d", ++number);
111     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
112                       0, "rx_ServerProc", &scratchPid);
113     if (registerProgram)
114         (*registerProgram)(scratchPid, name);
115 }
116
117 void rxi_StartListener() {
118     /* Priority of listener should be high, so it can keep conns alive */
119 #define RX_LIST_STACK   24000
120     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
121                       "rx_Listener", &rx_listenerPid);
122     if (registerProgram)
123         (*registerProgram)(rx_listenerPid, "listener");
124 }
125
126 /* The main loop which listens to the net for datagrams, and handles timeouts
127    and retransmissions, etc.  It also is responsible for scheduling the
128    execution of pending events (in conjunction with event.c).
129
130    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
131    keeping up with the incoming stream of packets (because there are threads that
132    are interfering with its running sufficiently often), rx does a polling select
133    before doing a real IOMGR_Select system call.  Doing a real select means that
134    we don't have to let other processes run before processing more packets.
135
136    So, our algorithm is that if the last poll on the file descriptor found useful data, or
137    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
138    then we try the polling select before the IOMGR_Select.  If we eventually catch up
139    (which we can tell by the polling select returning no input packets ready), then we
140    don't do a polling select again until several seconds later (via nextPollTime mechanism).
141    */
142
143 void rxi_ListenerProc(rfds, tnop, newcallp)
144     fd_set *rfds;
145     int *tnop;
146     struct rx_call **newcallp;
147 {
148     afs_uint32 host;
149     u_short port;
150     register struct rx_packet *p = (struct rx_packet *)0;
151     int socket;
152     struct clock cv;
153     afs_int32 nextPollTime;             /* time to next poll FD before sleeping */
154     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
155     struct timeval tv, *tvp;
156     int code;
157 #ifdef AFS_NT40_ENV
158     int i;
159 #endif
160     PROCESS pid;
161     char name[MAXTHREADNAMELENGTH] = "srv_0";
162
163     clock_NewTime();
164     lastPollWorked = 0;
165     nextPollTime = 0;
166     code = LWP_CurrentProcess(&pid);
167     if (code) {
168         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
169         exit(1);
170     }
171     rx_listenerPid = pid;
172     if (swapNameProgram)
173        (*swapNameProgram)(pid, "listener", &name);
174
175     for (;;) {
176         /* Grab a new packet only if necessary (otherwise re-use the old one) */
177         if (p) {
178             rxi_RestoreDataBufs(p);
179         }
180         else {
181             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
182                 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
183         }
184         /* Wait for the next event time or a packet to arrive. */
185         /* event_RaiseEvents schedules any events whose time has come and
186            then atomically computes the time to the next event, guaranteeing
187            that this is positive.  If there is no next event, it returns 0 */
188         clock_NewTime();
189         if (!rxevent_RaiseEvents(&cv)) tvp = (struct timeval *) 0;
190         else {
191             /* It's important to copy cv to tv, because the 4.3 documentation
192                for select threatens that *tv may be updated after a select, in
193                future editions of the system, to indicate how much of the time
194                period has elapsed.  So we shouldn't rely on tv not being altered. */
195             tv.tv_sec = cv.sec; /* Time to next event */
196             tv.tv_usec = cv.usec;
197             tvp = &tv;
198         }       
199         rx_stats.selects++;
200
201         *rfds = rx_selectMask;
202
203         if (lastPollWorked || nextPollTime < clock_Sec()) {
204             /* we're catching up, or haven't tried to for a few seconds */
205             doingPoll = 1;
206             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
207             tv.tv_sec = tv.tv_usec = 0;         /* make sure we poll */
208             tvp = &tv;
209             code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
210         }
211         else {
212             doingPoll = 0;
213             code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
214         }
215         lastPollWorked = 0;     /* default is that it didn't find anything */
216
217         if (quitListening) {
218             quitListening = 0;
219             LWP_DestroyProcess(pid);
220         }
221
222         switch(code) {
223             case 0: 
224                 /* Timer interrupt:
225                  * If it was a timer interrupt then we can assume that
226                  * the time has advanced by roughly the value of the
227                  * previous timeout, and that there is now at least
228                  * one pending event.
229                  */
230                 clock_NewTime();
231                 break;
232             case -1: 
233                 /* select or IOMGR_Select returned failure */
234                 debugSelectFailure++;   /* update debugging counter */
235                 clock_NewTime();
236                 break;
237             case -2:
238                 /* IOMGR_Cancel:
239                  * IOMGR_Cancel is invoked whenever a new event is
240                  * posted that is earlier than any existing events.
241                  * So we re-evaluate the time, and then go back to
242                  * reschedule events
243                  */
244                 clock_NewTime();
245                 break;
246
247             default: 
248                 /* Packets have arrived, presumably:
249                  * If it wasn't a timer interrupt, then no event should have
250                  * timed out yet (well some event may have, but only just...), so
251                  * we don't bother looking to see if any have timed out, but just
252                  * go directly to reading the data packets
253                  */
254                 clock_NewTime();
255                 if (doingPoll) lastPollWorked = 1;
256 #ifdef AFS_NT40_ENV
257                 for (i=0; p && i<rfds->fd_count; i++) {
258                     socket = rfds->fd_array[i];
259                     if (rxi_ReadPacket(socket, p, &host, &port)) {
260                         *newcallp = NULL;
261                         p = rxi_ReceivePacket(p, socket, host, port,
262                                               tnop, newcallp);
263                         if (newcallp && *newcallp) {
264                             if (p) {
265                                 rxi_FreePacket(p);
266                             }
267                             if (swapNameProgram) {
268                                 (*swapNameProgram)(rx_listenerPid, &name, 0);
269                                 rx_listenerPid = 0;
270                             }
271                             return;
272                         }
273                     }
274                 }
275 #else
276                 for (socket = rx_minSocketNumber;
277                      p && socket <= rx_maxSocketNumber; socket++)  {
278                      if (!FD_ISSET(socket, rfds))
279                          continue;
280                      if (rxi_ReadPacket(socket, p, &host, &port)) {
281                          p = rxi_ReceivePacket(p, socket, host, port,
282                                                tnop, newcallp);
283                         if (newcallp && *newcallp) {
284                             if (p) {
285                                 rxi_FreePacket(p);
286                             }
287                             if (swapNameProgram) {
288                                 (*swapNameProgram)(rx_listenerPid, &name, 0);
289                                 rx_listenerPid = 0;
290                             }
291                             return;
292                         }
293                     }
294                 }
295 #endif
296                 break;
297         }
298     }
299     /* NOTREACHED */
300 }
301
302 /* This is the listener process request loop. The listener process loop
303  * becomes a server thread when rxi_ListenerProc returns, and stays
304  * server thread until rxi_ServerProc returns. */
305 static void rx_ListenerProc(void *dummy)
306 {
307     int threadID;
308     int sock;
309     struct rx_call *newcall;
310     fd_set *rfds;
311
312     if (!(rfds = IOMGR_AllocFDSet())) {
313         osi_Panic("rx_ListenerProc: no fd_sets!\n");
314     }
315
316     while(1) {
317         newcall = NULL;
318         threadID = -1;
319         rxi_ListenerProc(rfds, &threadID, &newcall);
320         /* assert(threadID != -1); */
321         /* assert(newcall != NULL); */
322         sock = OSI_NULLSOCKET;
323         rxi_ServerProc(threadID, newcall, &sock);
324         /* assert(sock != OSI_NULLSOCKET); */
325     }
326     /* not reached */
327 }
328
329 /* This is the server process request loop. The server process loop
330  * becomes a listener thread when rxi_ServerProc returns, and stays
331  * listener thread until rxi_ListenerProc returns. */
332 void rx_ServerProc()
333 {
334     int sock;
335     int threadID;
336     struct rx_call *newcall = NULL;
337     fd_set *rfds;
338
339     if (!(rfds = IOMGR_AllocFDSet())) {
340         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
341     }
342
343     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
344     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
345     /* threadID is used for making decisions in GetCall.  Get it by bumping
346      * number of threads handling incoming calls */
347     threadID = rxi_availProcs++;
348
349     while(1) {
350         sock = OSI_NULLSOCKET;
351         rxi_ServerProc(threadID, newcall, &sock);
352         /* assert(sock != OSI_NULLSOCKET); */
353         newcall = NULL;
354         rxi_ListenerProc(rfds, &threadID, &newcall);
355         /* assert(threadID != -1); */
356         /* assert(newcall != NULL); */
357     }
358     /* not reached */
359 }
360
361 /*
362  * Called from GetUDPSocket.
363  * Called from a single thread at startup.
364  * Returns 0 on success; -1 on failure.
365  */
366 int rxi_Listen(sock)
367     osi_socket sock;
368 {
369 #ifndef AFS_NT40_ENV
370     /*
371      * Put the socket into non-blocking mode so that rx_Listener
372      * can do a polling read before entering select
373      */
374 #ifndef AFS_DJGPP_ENV
375     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
376         perror("fcntl");
377         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
378         return -1;
379     }
380 #else
381     if ( __djgpp_set_socket_blocking_mode(sock, 1) < 0 ) {
382       perror("__djgpp_set_socket_blocking_mode");
383       (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
384       return -1;
385     }
386 #endif /* AFS_DJGPP_ENV */
387
388     if (sock > FD_SETSIZE-1) {
389         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
390          FD_SETSIZE-1);
391         return -1;
392     }
393 #endif
394
395     FD_SET(sock, &rx_selectMask);
396     if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
397     if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
398     return 0;
399 }
400
401 /*
402  * Recvmsg
403  */
404 int rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
405 {
406     return recvmsg((int) socket, msg_p, flags);
407 }
408
409 /*
410  * Simulate a blocking sendmsg on the non-blocking socket.
411  * It's non blocking because it was set that way for recvmsg.
412  */
413 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
414 {
415     fd_set *sfds = (fd_set*)0;
416     while (sendmsg(socket, msg_p, flags) == -1) {
417         int err;
418         rx_stats.sendSelects++;
419
420         if (!sfds) {
421             if ( !(sfds = IOMGR_AllocFDSet())) {
422                 (osi_Msg "rx failed to alloc fd_set: ");
423                 perror("rx_sendmsg");
424                 return 3;
425             }
426             FD_SET(socket, sfds);
427         }
428             
429 #ifdef AFS_NT40_ENV
430         if (errno)
431 #elif defined(AFS_LINUX22_ENV)
432           /* linux unfortunately returns ECONNREFUSED if the target port
433            * is no longer in use */
434         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED)
435 #else
436         if (errno != EWOULDBLOCK && errno != ENOBUFS)
437 #endif
438         {
439             (osi_Msg "rx failed to send packet: ");
440             perror("rx_sendmsg");
441             return 3;
442         }
443         while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
444             if (err >= 0 || errno != EINTR) 
445               osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
446         }
447     }
448     if (sfds)
449         IOMGR_FreeFDSet(sfds);
450     return 0;
451 }