6517423b96adf2e4e68d992dd4c3447a16896f4a
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 # include <afs/param.h>
13 # include <sys/types.h>         /* fd_set on older platforms */
14 # include <errno.h>
15 # include <signal.h>
16 #ifdef AFS_NT40_ENV
17 # include <winsock2.h>
18 #else
19 # include <unistd.h>            /* select() prototype */
20 # include <sys/time.h>          /* struct timeval, select() prototype */
21 # ifndef FD_SET
22 #  include <sys/select.h>       /* fd_set on newer platforms */
23 # endif
24 # include <sys/socket.h>
25 # include <sys/file.h>
26 # include <netdb.h>
27 # include <sys/stat.h>
28 # include <netinet/in.h>
29 # include <net/if.h>
30 # include <sys/ioctl.h>
31 # include <sys/time.h>
32 #endif
33 # include "rx.h"
34 # include "rx_globals.h"
35 # include <lwp.h>
36
37
38 int debugSelectFailure; /* # of times select failed */
39 /*
40  * Sleep on the unique wait channel provided.
41  */
42 void rxi_Sleep(void *addr)
43 {
44     LWP_WaitProcess(addr);
45 }
46
47 /*
48  * Wakeup any threads on the channel provided.
49  * They may be woken up spuriously, and must check any conditions.
50  */
51 void rxi_Wakeup(void *addr)
52 {
53     LWP_NoYieldSignal(addr);
54 }
55
56 PROCESS rx_listenerPid; /* LWP process id of socket listener process */
57 static void rx_ListenerProc(void *dummy);
58
59 /*
60  * Delay the current thread the specified number of seconds.
61  */
62 void rxi_Delay(int sec)
63 {
64     IOMGR_Sleep(sec);
65 }
66
67 /* This routine will get called by the event package whenever a new,
68    earlier than others, event is posted.  If the Listener process
69    is blocked in selects, this will unblock it.  It also can be called
70    to force a new trip through the rxi_Listener select loop when the set
71    of file descriptors it should be listening to changes... */
72 void rxi_ReScheduleEvents() {
73     if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
74 }
75
76 void rxi_InitializeThreadSupport(void)
77 {
78     PROCESS junk;
79
80     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
81     IOMGR_Initialize();
82     FD_ZERO(&rx_selectMask);
83 }
84
85 void rxi_StartServerProc(proc, stacksize)
86     long (*proc)();
87 {
88     PROCESS scratchPid;
89     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
90                       0, "rx_ServerProc", &scratchPid);
91 }
92
93 void rxi_StartListener() {
94     /* Priority of listener should be high, so it can keep conns alive */
95 #define RX_LIST_STACK   24000
96     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
97                       "rx_Listener", &rx_listenerPid);
98 }
99
100 /* The main loop which listens to the net for datagrams, and handles timeouts
101    and retransmissions, etc.  It also is responsible for scheduling the
102    execution of pending events (in conjunction with event.c).
103
104    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
105    keeping up with the incoming stream of packets (because there are threads that
106    are interfering with its running sufficiently often), rx does a polling select
107    before doing a real IOMGR_Select system call.  Doing a real select means that
108    we don't have to let other processes run before processing more packets.
109
110    So, our algorithm is that if the last poll on the file descriptor found useful data, or
111    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
112    then we try the polling select before the IOMGR_Select.  If we eventually catch up
113    (which we can tell by the polling select returning no input packets ready), then we
114    don't do a polling select again until several seconds later (via nextPollTime mechanism).
115    */
116
117 void rxi_ListenerProc(rfds, tnop, newcallp)
118     fd_set *rfds;
119     int *tnop;
120     struct rx_call **newcallp;
121 {
122     afs_uint32 host;
123     u_short port;
124     register struct rx_packet *p = (struct rx_packet *)0;
125     int socket;
126     struct clock cv;
127     afs_int32 nextPollTime;             /* time to next poll FD before sleeping */
128     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
129     struct timeval tv, *tvp;
130     int i, code;
131
132     clock_NewTime();
133     lastPollWorked = 0;
134     nextPollTime = 0;
135
136     for (;;) {
137         /* Grab a new packet only if necessary (otherwise re-use the old one) */
138         if (p) {
139             rxi_RestoreDataBufs(p);
140         }
141         else {
142             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
143                 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
144         }
145         /* Wait for the next event time or a packet to arrive. */
146         /* event_RaiseEvents schedules any events whose time has come and
147            then atomically computes the time to the next event, guaranteeing
148            that this is positive.  If there is no next event, it returns 0 */
149         clock_NewTime();
150         if (!rxevent_RaiseEvents(&cv)) tvp = (struct timeval *) 0;
151         else {
152             /* It's important to copy cv to tv, because the 4.3 documentation
153                for select threatens that *tv may be updated after a select, in
154                future editions of the system, to indicate how much of the time
155                period has elapsed.  So we shouldn't rely on tv not being altered. */
156             tv.tv_sec = cv.sec; /* Time to next event */
157             tv.tv_usec = cv.usec;
158             tvp = &tv;
159         }       
160         rx_stats.selects++;
161
162         *rfds = rx_selectMask;
163
164         if (lastPollWorked || nextPollTime < clock_Sec()) {
165             /* we're catching up, or haven't tried to for a few seconds */
166             doingPoll = 1;
167             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
168             tv.tv_sec = tv.tv_usec = 0;         /* make sure we poll */
169             tvp = &tv;
170             code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
171         }
172         else {
173             doingPoll = 0;
174             code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
175         }
176         lastPollWorked = 0;     /* default is that it didn't find anything */
177
178         switch(code) {
179             case 0: 
180                 /* Timer interrupt:
181                  * If it was a timer interrupt then we can assume that
182                  * the time has advanced by roughly the value of the
183                  * previous timeout, and that there is now at least
184                  * one pending event.
185                  */
186                 clock_NewTime();
187                 break;
188             case -1: 
189                 /* select or IOMGR_Select returned failure */
190                 debugSelectFailure++;   /* update debugging counter */
191                 clock_NewTime();
192                 break;
193             case -2:
194                 /* IOMGR_Cancel:
195                  * IOMGR_Cancel is invoked whenever a new event is
196                  * posted that is earlier than any existing events.
197                  * So we re-evaluate the time, and then go back to
198                  * reschedule events
199                  */
200                 clock_NewTime();
201                 break;
202
203             default: 
204                 /* Packets have arrived, presumably:
205                  * If it wasn't a timer interrupt, then no event should have
206                  * timed out yet (well some event may have, but only just...), so
207                  * we don't bother looking to see if any have timed out, but just
208                  * go directly to reading the data packets
209                  */
210                 clock_NewTime();
211                 if (doingPoll) lastPollWorked = 1;
212 #ifdef AFS_NT40_ENV
213                 for (i=0; p && i<rfds->fd_count; i++) {
214                     socket = rfds->fd_array[i];
215                     if (rxi_ReadPacket(socket, p, &host, &port)) {
216                         *newcallp = NULL;
217                         p = rxi_ReceivePacket(p, socket, host, port,
218                                               tnop, newcallp);
219                         if (newcallp && *newcallp) {
220                             if (p) {
221                                 rxi_FreePacket(p);
222                             }
223                             return;
224                         }
225                     }
226                 }
227 #else
228                 for (socket = rx_minSocketNumber;
229                      p && socket <= rx_maxSocketNumber; socket++)  {
230                      if (!FD_ISSET(socket, rfds))
231                          continue;
232                      if (rxi_ReadPacket(socket, p, &host, &port)) {
233                          p = rxi_ReceivePacket(p, socket, host, port,
234                                                tnop, newcallp);
235                         if (newcallp && *newcallp) {
236                             if (p) {
237                                 rxi_FreePacket(p);
238                             }
239                             return;
240                         }
241                     }
242                 }
243 #endif
244                 break;
245         }
246     }
247     /* NOTREACHED */
248 }
249
250 /* This is the listener process request loop. The listener process loop
251  * becomes a server thread when rxi_ListenerProc returns, and stays
252  * server thread until rxi_ServerProc returns. */
253 static void rx_ListenerProc(void *dummy)
254 {
255     int threadID;
256     int sock;
257     struct rx_call *newcall;
258     fd_set *rfds;
259
260     if (!(rfds = IOMGR_AllocFDSet())) {
261         osi_Panic("rx_ListenerProc: no fd_sets!\n");
262     }
263
264     while(1) {
265         newcall = NULL;
266         threadID = -1;
267         rxi_ListenerProc(rfds, &threadID, &newcall);
268         /* assert(threadID != -1); */
269         /* assert(newcall != NULL); */
270         sock = OSI_NULLSOCKET;
271         rxi_ServerProc(threadID, newcall, &sock);
272         /* assert(sock != OSI_NULLSOCKET); */
273     }
274     /* not reached */
275 }
276
277 /* This is the server process request loop. The server process loop
278  * becomes a listener thread when rxi_ServerProc returns, and stays
279  * listener thread until rxi_ListenerProc returns. */
280 void rx_ServerProc()
281 {
282     int sock;
283     int threadID;
284     struct rx_call *newcall = NULL;
285     fd_set *rfds;
286
287     if (!(rfds = IOMGR_AllocFDSet())) {
288         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
289     }
290
291     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
292     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
293     /* threadID is used for making decisions in GetCall.  Get it by bumping
294      * number of threads handling incoming calls */
295     threadID = rxi_availProcs++;
296
297     while(1) {
298         sock = OSI_NULLSOCKET;
299         rxi_ServerProc(threadID, newcall, &sock);
300         /* assert(sock != OSI_NULLSOCKET); */
301         newcall = NULL;
302         rxi_ListenerProc(rfds, &threadID, &newcall);
303         /* assert(threadID != -1); */
304         /* assert(newcall != NULL); */
305     }
306     /* not reached */
307 }
308
309 /*
310  * Called from GetUDPSocket.
311  * Called from a single thread at startup.
312  * Returns 0 on success; -1 on failure.
313  */
314 int rxi_Listen(sock)
315     osi_socket sock;
316 {
317 #ifndef AFS_NT40_ENV
318     /*
319      * Put the socket into non-blocking mode so that rx_Listener
320      * can do a polling read before entering select
321      */
322     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
323         perror("fcntl");
324         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
325         return -1;
326     }
327
328     if (sock > FD_SETSIZE-1) {
329         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
330          FD_SETSIZE-1);
331         return -1;
332     }
333 #endif
334
335     FD_SET(sock, &rx_selectMask);
336     if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
337     if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
338     return 0;
339 }
340
341 /*
342  * Recvmsg
343  */
344 int rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
345 {
346     return recvmsg((int) socket, msg_p, flags);
347 }
348
349 /*
350  * Simulate a blocking sendmsg on the non-blocking socket.
351  * It's non blocking because it was set that way for recvmsg.
352  */
353 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
354 {
355     fd_set *sfds = (fd_set*)0;
356     while (sendmsg(socket, msg_p, flags) == -1) {
357         int err;
358         rx_stats.sendSelects++;
359
360         if (!sfds) {
361             if ( !(sfds = IOMGR_AllocFDSet())) {
362                 (osi_Msg "rx failed to alloc fd_set: ");
363                 perror("rx_sendmsg");
364                 return 3;
365             }
366             FD_SET(socket, sfds);
367         }
368             
369 #ifdef AFS_NT40_ENV
370         if (errno)
371 #elif defined(AFS_LINUX22_ENV)
372           /* linux unfortunately returns ECONNREFUSED if the target port
373            * is no longer in use */
374         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED)
375 #else
376         if (errno != EWOULDBLOCK && errno != ENOBUFS)
377 #endif
378         {
379             (osi_Msg "rx failed to send packet: ");
380             perror("rx_sendmsg");
381             return 3;
382         }
383         while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
384             if (err >= 0 || errno != EINTR) 
385               osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
386         }
387     }
388     if (sfds)
389         IOMGR_FreeFDSet(sfds);
390 }