a5bfb4e61814ee2dd304dd789aabd83b2763023d
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 # include <afs/param.h>
13 # include <sys/types.h>         /* fd_set on older platforms */
14 # include <errno.h>
15 # include <signal.h>
16 #ifdef AFS_NT40_ENV
17 # include <winsock2.h>
18 #else
19 # include <unistd.h>            /* select() prototype */
20 # include <sys/time.h>          /* struct timeval, select() prototype */
21 # ifndef FD_SET
22 #  include <sys/select.h>       /* fd_set on newer platforms */
23 # endif
24 # include <sys/socket.h>
25 # include <sys/file.h>
26 # include <netdb.h>
27 # include <sys/stat.h>
28 # include <netinet/in.h>
29 # include <net/if.h>
30 # include <sys/ioctl.h>
31 # include <sys/time.h>
32 #endif
33 # include "rx.h"
34 # include "rx_globals.h"
35 # include <lwp.h>
36
37 #define MAXTHREADNAMELENGTH 64
38
39 extern int (*registerProgram)();
40 extern int (*swapNameProgram)();
41
42 int debugSelectFailure; /* # of times select failed */
43 /*
44  * Sleep on the unique wait channel provided.
45  */
46 void rxi_Sleep(void *addr)
47 {
48     LWP_WaitProcess(addr);
49 }
50
51 /*
52  * Wakeup any threads on the channel provided.
53  * They may be woken up spuriously, and must check any conditions.
54  */
55 void rxi_Wakeup(void *addr)
56 {
57     LWP_NoYieldSignal(addr);
58 }
59
60 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
61 void rx_ListenerProc(void *dummy);
62
63 /*
64  * Delay the current thread the specified number of seconds.
65  */
66 void rxi_Delay(int sec)
67 {
68     IOMGR_Sleep(sec);
69 }
70
71 static int quitListening = 0;
72
73 /* This routine will kill the listener thread, if it exists. */
74 void
75 rxi_StopListener()
76 {
77     quitListening = 1;
78     rxi_ReScheduleEvents();
79 }
80
81 /* This routine will get called by the event package whenever a new,
82    earlier than others, event is posted.  If the Listener process
83    is blocked in selects, this will unblock it.  It also can be called
84    to force a new trip through the rxi_Listener select loop when the set
85    of file descriptors it should be listening to changes... */
86 void rxi_ReScheduleEvents() {
87     if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
88 }
89
90 void rxi_InitializeThreadSupport(void)
91 {
92     PROCESS junk;
93
94     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
95     IOMGR_Initialize();
96     FD_ZERO(&rx_selectMask);
97 }
98
99 void rxi_StartServerProc(proc, stacksize)
100     long (*proc)();
101 {
102     PROCESS scratchPid;
103     static int number = 0;
104     char name[32];
105
106     sprintf(name, "srv_%d", ++number);
107     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
108                       0, "rx_ServerProc", &scratchPid);
109     if (registerProgram)
110         (*registerProgram)(scratchPid, name);
111 }
112
113 void rxi_StartListener() {
114     /* Priority of listener should be high, so it can keep conns alive */
115 #define RX_LIST_STACK   24000
116     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
117                       "rx_Listener", &rx_listenerPid);
118     if (registerProgram)
119         (*registerProgram)(rx_listenerPid, "listener");
120 }
121
122 /* The main loop which listens to the net for datagrams, and handles timeouts
123    and retransmissions, etc.  It also is responsible for scheduling the
124    execution of pending events (in conjunction with event.c).
125
126    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
127    keeping up with the incoming stream of packets (because there are threads that
128    are interfering with its running sufficiently often), rx does a polling select
129    before doing a real IOMGR_Select system call.  Doing a real select means that
130    we don't have to let other processes run before processing more packets.
131
132    So, our algorithm is that if the last poll on the file descriptor found useful data, or
133    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
134    then we try the polling select before the IOMGR_Select.  If we eventually catch up
135    (which we can tell by the polling select returning no input packets ready), then we
136    don't do a polling select again until several seconds later (via nextPollTime mechanism).
137    */
138
139 void rxi_ListenerProc(rfds, tnop, newcallp)
140     fd_set *rfds;
141     int *tnop;
142     struct rx_call **newcallp;
143 {
144     afs_uint32 host;
145     u_short port;
146     register struct rx_packet *p = (struct rx_packet *)0;
147     int socket;
148     struct clock cv;
149     afs_int32 nextPollTime;             /* time to next poll FD before sleeping */
150     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
151     struct timeval tv, *tvp;
152     int i, code;
153     PROCESS pid;
154     char name[MAXTHREADNAMELENGTH] = "srv_0";
155
156     clock_NewTime();
157     lastPollWorked = 0;
158     nextPollTime = 0;
159     code = LWP_CurrentProcess(&pid);
160     if (code) {
161         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
162         exit(1);
163     }
164     rx_listenerPid = pid;
165     if (swapNameProgram)
166        (*swapNameProgram)(pid, "listener", &name);
167
168     for (;;) {
169         /* Grab a new packet only if necessary (otherwise re-use the old one) */
170         if (p) {
171             rxi_RestoreDataBufs(p);
172         }
173         else {
174             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
175                 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
176         }
177         /* Wait for the next event time or a packet to arrive. */
178         /* event_RaiseEvents schedules any events whose time has come and
179            then atomically computes the time to the next event, guaranteeing
180            that this is positive.  If there is no next event, it returns 0 */
181         clock_NewTime();
182         if (!rxevent_RaiseEvents(&cv)) tvp = (struct timeval *) 0;
183         else {
184             /* It's important to copy cv to tv, because the 4.3 documentation
185                for select threatens that *tv may be updated after a select, in
186                future editions of the system, to indicate how much of the time
187                period has elapsed.  So we shouldn't rely on tv not being altered. */
188             tv.tv_sec = cv.sec; /* Time to next event */
189             tv.tv_usec = cv.usec;
190             tvp = &tv;
191         }       
192         rx_stats.selects++;
193
194         *rfds = rx_selectMask;
195
196         if (lastPollWorked || nextPollTime < clock_Sec()) {
197             /* we're catching up, or haven't tried to for a few seconds */
198             doingPoll = 1;
199             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
200             tv.tv_sec = tv.tv_usec = 0;         /* make sure we poll */
201             tvp = &tv;
202             code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
203         }
204         else {
205             doingPoll = 0;
206             code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
207         }
208         lastPollWorked = 0;     /* default is that it didn't find anything */
209
210         if (quitListening) {
211             quitListening = 0;
212             LWP_DestroyProcess(pid);
213         }
214
215         switch(code) {
216             case 0: 
217                 /* Timer interrupt:
218                  * If it was a timer interrupt then we can assume that
219                  * the time has advanced by roughly the value of the
220                  * previous timeout, and that there is now at least
221                  * one pending event.
222                  */
223                 clock_NewTime();
224                 break;
225             case -1: 
226                 /* select or IOMGR_Select returned failure */
227                 debugSelectFailure++;   /* update debugging counter */
228                 clock_NewTime();
229                 break;
230             case -2:
231                 /* IOMGR_Cancel:
232                  * IOMGR_Cancel is invoked whenever a new event is
233                  * posted that is earlier than any existing events.
234                  * So we re-evaluate the time, and then go back to
235                  * reschedule events
236                  */
237                 clock_NewTime();
238                 break;
239
240             default: 
241                 /* Packets have arrived, presumably:
242                  * If it wasn't a timer interrupt, then no event should have
243                  * timed out yet (well some event may have, but only just...), so
244                  * we don't bother looking to see if any have timed out, but just
245                  * go directly to reading the data packets
246                  */
247                 clock_NewTime();
248                 if (doingPoll) lastPollWorked = 1;
249 #ifdef AFS_NT40_ENV
250                 for (i=0; p && i<rfds->fd_count; i++) {
251                     socket = rfds->fd_array[i];
252                     if (rxi_ReadPacket(socket, p, &host, &port)) {
253                         *newcallp = NULL;
254                         p = rxi_ReceivePacket(p, socket, host, port,
255                                               tnop, newcallp);
256                         if (newcallp && *newcallp) {
257                             if (p) {
258                                 rxi_FreePacket(p);
259                             }
260                             if (swapNameProgram) {
261                                 (*swapNameProgram)(rx_listnerPid, &name, 0);
262                                 rx_listenerPid = 0;
263                             }
264                             return;
265                         }
266                     }
267                 }
268 #else
269                 for (socket = rx_minSocketNumber;
270                      p && socket <= rx_maxSocketNumber; socket++)  {
271                      if (!FD_ISSET(socket, rfds))
272                          continue;
273                      if (rxi_ReadPacket(socket, p, &host, &port)) {
274                          p = rxi_ReceivePacket(p, socket, host, port,
275                                                tnop, newcallp);
276                         if (newcallp && *newcallp) {
277                             if (p) {
278                                 rxi_FreePacket(p);
279                             }
280                             if (swapNameProgram) {
281                                 (*swapNameProgram)(rx_listenerPid, &name, 0);
282                                 rx_listenerPid = 0;
283                             }
284                             return;
285                         }
286                     }
287                 }
288 #endif
289                 break;
290         }
291     }
292     /* NOTREACHED */
293 }
294
295 /* This is the listener process request loop. The listener process loop
296  * becomes a server thread when rxi_ListenerProc returns, and stays
297  * server thread until rxi_ServerProc returns. */
298 static void rx_ListenerProc(void *dummy)
299 {
300     int threadID;
301     int sock;
302     struct rx_call *newcall;
303     fd_set *rfds;
304
305     if (!(rfds = IOMGR_AllocFDSet())) {
306         osi_Panic("rx_ListenerProc: no fd_sets!\n");
307     }
308
309     while(1) {
310         newcall = NULL;
311         threadID = -1;
312         rxi_ListenerProc(rfds, &threadID, &newcall);
313         /* assert(threadID != -1); */
314         /* assert(newcall != NULL); */
315         sock = OSI_NULLSOCKET;
316         rxi_ServerProc(threadID, newcall, &sock);
317         /* assert(sock != OSI_NULLSOCKET); */
318     }
319     /* not reached */
320 }
321
322 /* This is the server process request loop. The server process loop
323  * becomes a listener thread when rxi_ServerProc returns, and stays
324  * listener thread until rxi_ListenerProc returns. */
325 void rx_ServerProc()
326 {
327     int sock;
328     int threadID;
329     struct rx_call *newcall = NULL;
330     fd_set *rfds;
331
332     if (!(rfds = IOMGR_AllocFDSet())) {
333         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
334     }
335
336     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
337     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
338     /* threadID is used for making decisions in GetCall.  Get it by bumping
339      * number of threads handling incoming calls */
340     threadID = rxi_availProcs++;
341
342     while(1) {
343         sock = OSI_NULLSOCKET;
344         rxi_ServerProc(threadID, newcall, &sock);
345         /* assert(sock != OSI_NULLSOCKET); */
346         newcall = NULL;
347         rxi_ListenerProc(rfds, &threadID, &newcall);
348         /* assert(threadID != -1); */
349         /* assert(newcall != NULL); */
350     }
351     /* not reached */
352 }
353
354 /*
355  * Called from GetUDPSocket.
356  * Called from a single thread at startup.
357  * Returns 0 on success; -1 on failure.
358  */
359 int rxi_Listen(sock)
360     osi_socket sock;
361 {
362 #ifndef AFS_NT40_ENV
363     /*
364      * Put the socket into non-blocking mode so that rx_Listener
365      * can do a polling read before entering select
366      */
367     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
368         perror("fcntl");
369         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
370         return -1;
371     }
372
373     if (sock > FD_SETSIZE-1) {
374         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
375          FD_SETSIZE-1);
376         return -1;
377     }
378 #endif
379
380     FD_SET(sock, &rx_selectMask);
381     if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
382     if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
383     return 0;
384 }
385
386 /*
387  * Recvmsg
388  */
389 int rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
390 {
391     return recvmsg((int) socket, msg_p, flags);
392 }
393
394 /*
395  * Simulate a blocking sendmsg on the non-blocking socket.
396  * It's non blocking because it was set that way for recvmsg.
397  */
398 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
399 {
400     fd_set *sfds = (fd_set*)0;
401     while (sendmsg(socket, msg_p, flags) == -1) {
402         int err;
403         rx_stats.sendSelects++;
404
405         if (!sfds) {
406             if ( !(sfds = IOMGR_AllocFDSet())) {
407                 (osi_Msg "rx failed to alloc fd_set: ");
408                 perror("rx_sendmsg");
409                 return 3;
410             }
411             FD_SET(socket, sfds);
412         }
413             
414 #ifdef AFS_NT40_ENV
415         if (errno)
416 #elif defined(AFS_LINUX22_ENV)
417           /* linux unfortunately returns ECONNREFUSED if the target port
418            * is no longer in use */
419         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED)
420 #else
421         if (errno != EWOULDBLOCK && errno != ENOBUFS)
422 #endif
423         {
424             (osi_Msg "rx failed to send packet: ");
425             perror("rx_sendmsg");
426             return 3;
427         }
428         while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
429             if (err >= 0 || errno != EINTR) 
430               osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
431         }
432     }
433     if (sfds)
434         IOMGR_FreeFDSet(sfds);
435 }