(no commit message)
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 # include <afs/param.h>
13 # include <sys/types.h>         /* fd_set on older platforms */
14 # include <errno.h>
15 # include <signal.h>
16 #ifdef AFS_NT40_ENV
17 # include <winsock2.h>
18 #else
19 # include <unistd.h>            /* select() prototype */
20 # include <sys/time.h>          /* struct timeval, select() prototype */
21 # ifndef FD_SET
22 #  include <sys/select.h>       /* fd_set on newer platforms */
23 # endif
24 # include <sys/socket.h>
25 # include <sys/file.h>
26 # include <netdb.h>
27 # include <sys/stat.h>
28 # include <netinet/in.h>
29 # include <net/if.h>
30 # include <sys/ioctl.h>
31 # include <sys/time.h>
32 #endif
33 # include "rx.h"
34 # include "rx_globals.h"
35 # include <lwp.h>
36
37 #define MAXTHREADNAMELENGTH 64
38
39 int debugSelectFailure; /* # of times select failed */
40 /*
41  * Sleep on the unique wait channel provided.
42  */
43 void rxi_Sleep(void *addr)
44 {
45     LWP_WaitProcess(addr);
46 }
47
48 /*
49  * Wakeup any threads on the channel provided.
50  * They may be woken up spuriously, and must check any conditions.
51  */
52 void rxi_Wakeup(void *addr)
53 {
54     LWP_NoYieldSignal(addr);
55 }
56
57 PROCESS rx_listenerPid; /* LWP process id of socket listener process */
58 static void rx_ListenerProc(void *dummy);
59
60 /*
61  * Delay the current thread the specified number of seconds.
62  */
63 void rxi_Delay(int sec)
64 {
65     IOMGR_Sleep(sec);
66 }
67
68 static int quitListening = 0;
69
70 /* This routine will kill the listener thread, if it exists. */
71 void
72 rxi_StopListener()
73 {
74     quitListening = 1;
75     rxi_ReScheduleEvents();
76 }
77
78 /* This routine will get called by the event package whenever a new,
79    earlier than others, event is posted.  If the Listener process
80    is blocked in selects, this will unblock it.  It also can be called
81    to force a new trip through the rxi_Listener select loop when the set
82    of file descriptors it should be listening to changes... */
83 void rxi_ReScheduleEvents() {
84     if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
85 }
86
87 void rxi_InitializeThreadSupport(void)
88 {
89     PROCESS junk;
90
91     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
92     IOMGR_Initialize();
93     FD_ZERO(&rx_selectMask);
94 }
95
96 void rxi_StartServerProc(proc, stacksize)
97     long (*proc)();
98 {
99     PROCESS scratchPid;
100     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
101                       0, "rx_ServerProc", &scratchPid);
102 }
103
104 void rxi_StartListener() {
105     /* Priority of listener should be high, so it can keep conns alive */
106 #define RX_LIST_STACK   24000
107     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
108                       "rx_Listener", &rx_listenerPid);
109 }
110
111 /* The main loop which listens to the net for datagrams, and handles timeouts
112    and retransmissions, etc.  It also is responsible for scheduling the
113    execution of pending events (in conjunction with event.c).
114
115    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
116    keeping up with the incoming stream of packets (because there are threads that
117    are interfering with its running sufficiently often), rx does a polling select
118    before doing a real IOMGR_Select system call.  Doing a real select means that
119    we don't have to let other processes run before processing more packets.
120
121    So, our algorithm is that if the last poll on the file descriptor found useful data, or
122    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
123    then we try the polling select before the IOMGR_Select.  If we eventually catch up
124    (which we can tell by the polling select returning no input packets ready), then we
125    don't do a polling select again until several seconds later (via nextPollTime mechanism).
126    */
127
128 void rxi_ListenerProc(rfds, tnop, newcallp)
129     fd_set *rfds;
130     int *tnop;
131     struct rx_call **newcallp;
132 {
133     afs_uint32 host;
134     u_short port;
135     register struct rx_packet *p = (struct rx_packet *)0;
136     int socket;
137     struct clock cv;
138     afs_int32 nextPollTime;             /* time to next poll FD before sleeping */
139     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
140     struct timeval tv, *tvp;
141     int i, code;
142     PROCESS pid;
143     char name[MAXTHREADNAMELENGTH] = "srv_0";
144
145     clock_NewTime();
146     lastPollWorked = 0;
147     nextPollTime = 0;
148     code = LWP_CurrentProcess(&pid);
149     if (code) {
150         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
151         exit(1);
152     }
153     rx_listenerPid = pid;
154     swapthreadname(pid, "listener", &name);
155
156     for (;;) {
157         /* Grab a new packet only if necessary (otherwise re-use the old one) */
158         if (p) {
159             rxi_RestoreDataBufs(p);
160         }
161         else {
162             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
163                 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
164         }
165         /* Wait for the next event time or a packet to arrive. */
166         /* event_RaiseEvents schedules any events whose time has come and
167            then atomically computes the time to the next event, guaranteeing
168            that this is positive.  If there is no next event, it returns 0 */
169         clock_NewTime();
170         if (!rxevent_RaiseEvents(&cv)) tvp = (struct timeval *) 0;
171         else {
172             /* It's important to copy cv to tv, because the 4.3 documentation
173                for select threatens that *tv may be updated after a select, in
174                future editions of the system, to indicate how much of the time
175                period has elapsed.  So we shouldn't rely on tv not being altered. */
176             tv.tv_sec = cv.sec; /* Time to next event */
177             tv.tv_usec = cv.usec;
178             tvp = &tv;
179         }       
180         rx_stats.selects++;
181
182         *rfds = rx_selectMask;
183
184         if (lastPollWorked || nextPollTime < clock_Sec()) {
185             /* we're catching up, or haven't tried to for a few seconds */
186             doingPoll = 1;
187             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
188             tv.tv_sec = tv.tv_usec = 0;         /* make sure we poll */
189             tvp = &tv;
190             code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
191         }
192         else {
193             doingPoll = 0;
194             code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
195         }
196         lastPollWorked = 0;     /* default is that it didn't find anything */
197
198         if (quitListening) {
199             quitListening = 0;
200             LWP_DestroyProcess(pid);
201         }
202
203         switch(code) {
204             case 0: 
205                 /* Timer interrupt:
206                  * If it was a timer interrupt then we can assume that
207                  * the time has advanced by roughly the value of the
208                  * previous timeout, and that there is now at least
209                  * one pending event.
210                  */
211                 clock_NewTime();
212                 break;
213             case -1: 
214                 /* select or IOMGR_Select returned failure */
215                 debugSelectFailure++;   /* update debugging counter */
216                 clock_NewTime();
217                 break;
218             case -2:
219                 /* IOMGR_Cancel:
220                  * IOMGR_Cancel is invoked whenever a new event is
221                  * posted that is earlier than any existing events.
222                  * So we re-evaluate the time, and then go back to
223                  * reschedule events
224                  */
225                 clock_NewTime();
226                 break;
227
228             default: 
229                 /* Packets have arrived, presumably:
230                  * If it wasn't a timer interrupt, then no event should have
231                  * timed out yet (well some event may have, but only just...), so
232                  * we don't bother looking to see if any have timed out, but just
233                  * go directly to reading the data packets
234                  */
235                 clock_NewTime();
236                 if (doingPoll) lastPollWorked = 1;
237 #ifdef AFS_NT40_ENV
238                 for (i=0; p && i<rfds->fd_count; i++) {
239                     socket = rfds->fd_array[i];
240                     if (rxi_ReadPacket(socket, p, &host, &port)) {
241                         *newcallp = NULL;
242                         p = rxi_ReceivePacket(p, socket, host, port,
243                                               tnop, newcallp);
244                         if (newcallp && *newcallp) {
245                             if (p) {
246                                 rxi_FreePacket(p);
247                             }
248                             return;
249                         }
250                     }
251                 }
252 #else
253                 for (socket = rx_minSocketNumber;
254                      p && socket <= rx_maxSocketNumber; socket++)  {
255                      if (!FD_ISSET(socket, rfds))
256                          continue;
257                      if (rxi_ReadPacket(socket, p, &host, &port)) {
258                          p = rxi_ReceivePacket(p, socket, host, port,
259                                                tnop, newcallp);
260                         if (newcallp && *newcallp) {
261                             if (p) {
262                                 rxi_FreePacket(p);
263                             }
264                             return;
265                         }
266                     }
267                 }
268 #endif
269                 break;
270         }
271     }
272     /* NOTREACHED */
273 }
274
275 /* This is the listener process request loop. The listener process loop
276  * becomes a server thread when rxi_ListenerProc returns, and stays
277  * server thread until rxi_ServerProc returns. */
278 static void rx_ListenerProc(void *dummy)
279 {
280     int threadID;
281     int sock;
282     struct rx_call *newcall;
283     fd_set *rfds;
284
285     if (!(rfds = IOMGR_AllocFDSet())) {
286         osi_Panic("rx_ListenerProc: no fd_sets!\n");
287     }
288
289     while(1) {
290         newcall = NULL;
291         threadID = -1;
292         rxi_ListenerProc(rfds, &threadID, &newcall);
293         /* assert(threadID != -1); */
294         /* assert(newcall != NULL); */
295         sock = OSI_NULLSOCKET;
296         rxi_ServerProc(threadID, newcall, &sock);
297         /* assert(sock != OSI_NULLSOCKET); */
298     }
299     /* not reached */
300 }
301
302 /* This is the server process request loop. The server process loop
303  * becomes a listener thread when rxi_ServerProc returns, and stays
304  * listener thread until rxi_ListenerProc returns. */
305 void rx_ServerProc()
306 {
307     int sock;
308     int threadID;
309     struct rx_call *newcall = NULL;
310     fd_set *rfds;
311
312     if (!(rfds = IOMGR_AllocFDSet())) {
313         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
314     }
315
316     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
317     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
318     /* threadID is used for making decisions in GetCall.  Get it by bumping
319      * number of threads handling incoming calls */
320     threadID = rxi_availProcs++;
321
322     while(1) {
323         sock = OSI_NULLSOCKET;
324         rxi_ServerProc(threadID, newcall, &sock);
325         /* assert(sock != OSI_NULLSOCKET); */
326         newcall = NULL;
327         rxi_ListenerProc(rfds, &threadID, &newcall);
328         /* assert(threadID != -1); */
329         /* assert(newcall != NULL); */
330     }
331     /* not reached */
332 }
333
334 /*
335  * Called from GetUDPSocket.
336  * Called from a single thread at startup.
337  * Returns 0 on success; -1 on failure.
338  */
339 int rxi_Listen(sock)
340     osi_socket sock;
341 {
342 #ifndef AFS_NT40_ENV
343     /*
344      * Put the socket into non-blocking mode so that rx_Listener
345      * can do a polling read before entering select
346      */
347     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
348         perror("fcntl");
349         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
350         return -1;
351     }
352
353     if (sock > FD_SETSIZE-1) {
354         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
355          FD_SETSIZE-1);
356         return -1;
357     }
358 #endif
359
360     FD_SET(sock, &rx_selectMask);
361     if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
362     if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
363     return 0;
364 }
365
366 /*
367  * Recvmsg
368  */
369 int rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
370 {
371     return recvmsg((int) socket, msg_p, flags);
372 }
373
374 /*
375  * Simulate a blocking sendmsg on the non-blocking socket.
376  * It's non blocking because it was set that way for recvmsg.
377  */
378 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
379 {
380     fd_set *sfds = (fd_set*)0;
381     while (sendmsg(socket, msg_p, flags) == -1) {
382         int err;
383         rx_stats.sendSelects++;
384
385         if (!sfds) {
386             if ( !(sfds = IOMGR_AllocFDSet())) {
387                 (osi_Msg "rx failed to alloc fd_set: ");
388                 perror("rx_sendmsg");
389                 return 3;
390             }
391             FD_SET(socket, sfds);
392         }
393             
394 #ifdef AFS_NT40_ENV
395         if (errno)
396 #elif defined(AFS_LINUX22_ENV)
397           /* linux unfortunately returns ECONNREFUSED if the target port
398            * is no longer in use */
399         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED)
400 #else
401         if (errno != EWOULDBLOCK && errno != ENOBUFS)
402 #endif
403         {
404             (osi_Msg "rx failed to send packet: ");
405             perror("rx_sendmsg");
406             return 3;
407         }
408         while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
409             if (err >= 0 || errno != EINTR) 
410               osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
411         }
412     }
413     if (sfds)
414         IOMGR_FreeFDSet(sfds);
415 }