fa4ee6dc48080aa80d5229b818eab3b7640bb3ce
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25
26 #ifdef HAVE_SYS_FILE_H
27 # include <sys/file.h>
28 #endif
29
30 #ifndef AFS_PTHREAD_ENV
31
32 #include <lwp.h>
33
34 #include "rx.h"
35 #include "rx_atomic.h"
36 #include "rx_globals.h"
37 #include "rx_internal.h"
38 #include "rx_stats.h"
39 #ifdef AFS_NT40_ENV
40 #include "rx_xmit_nt.h"
41 #endif
42
43 #define MAXTHREADNAMELENGTH 64
44
45 int debugSelectFailure;         /* # of times select failed */
46
47 /*
48  * Sleep on the unique wait channel provided.
49  */
50 void
51 rxi_Sleep(void *addr)
52 {
53     LWP_WaitProcess(addr);
54 }
55
56 /*
57  * Wakeup any threads on the channel provided.
58  * They may be woken up spuriously, and must check any conditions.
59  */
60 void
61 rxi_Wakeup(void *addr)
62 {
63     LWP_NoYieldSignal(addr);
64 }
65
66 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
67 static void* rx_ListenerProc(void *dummy);
68
69 /*
70  * Delay the current thread the specified number of seconds.
71  */
72 void
73 rxi_Delay(int sec)
74 {
75     IOMGR_Sleep(sec);
76 }
77
78 static int quitListening = 0;
79
80 /* This routine will kill the listener thread, if it exists. */
81 void
82 rxi_StopListener(void)
83 {
84     quitListening = 1;
85     rxi_ReScheduleEvents();
86 }
87
88 /* This routine will get called by the event package whenever a new,
89    earlier than others, event is posted.  If the Listener process
90    is blocked in selects, this will unblock it.  It also can be called
91    to force a new trip through the rxi_Listener select loop when the set
92    of file descriptors it should be listening to changes... */
93 void
94 rxi_ReScheduleEvents(void)
95 {
96     if (rx_listenerPid)
97         IOMGR_Cancel(rx_listenerPid);
98 }
99
100 void
101 rxi_InitializeThreadSupport(void)
102 {
103     PROCESS junk;
104
105     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
106     IOMGR_Initialize();
107     FD_ZERO(&rx_selectMask);
108 }
109
110 void
111 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
112 {
113     PROCESS scratchPid;
114     static int number = 0;
115     char name[32];
116
117     sprintf(name, "srv_%d", ++number);
118     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
119                       "rx_ServerProc", &scratchPid);
120     if (registerProgram)
121         (*registerProgram) (scratchPid, name);
122 }
123
124 void
125 rxi_StartListener(void)
126 {
127     /* Priority of listener should be high, so it can keep conns alive */
128 #define RX_LIST_STACK   24000
129     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
130                       NULL, "rx_Listener", &rx_listenerPid);
131     if (registerProgram)
132         (*registerProgram) (rx_listenerPid, "listener");
133 }
134
135 /* The main loop which listens to the net for datagrams, and handles timeouts
136    and retransmissions, etc.  It also is responsible for scheduling the
137    execution of pending events (in conjunction with event.c).
138
139    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
140    keeping up with the incoming stream of packets (because there are threads that
141    are interfering with its running sufficiently often), rx does a polling select
142    before doing a real IOMGR_Select system call.  Doing a real select means that
143    we don't have to let other processes run before processing more packets.
144
145    So, our algorithm is that if the last poll on the file descriptor found useful data, or
146    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
147    then we try the polling select before the IOMGR_Select.  If we eventually catch up
148    (which we can tell by the polling select returning no input packets ready), then we
149    don't do a polling select again until several seconds later (via nextPollTime mechanism).
150    */
151
152 static void
153 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
154 {
155     afs_uint32 host;
156     u_short port;
157     struct rx_packet *p = (struct rx_packet *)0;
158     osi_socket socket;
159     struct clock cv;
160     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
161     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
162     struct timeval tv, *tvp;
163     int code;
164 #ifdef AFS_NT40_ENV
165     int i;
166 #endif
167     PROCESS pid;
168     char name[MAXTHREADNAMELENGTH] = "srv_0";
169
170     clock_NewTime();
171     lastPollWorked = 0;
172     nextPollTime = 0;
173     code = LWP_CurrentProcess(&pid);
174     if (code) {
175         osi_Panic("rxi_Listener: Can't get my pid.\n");
176     }
177     rx_listenerPid = pid;
178     if (swapNameProgram)
179         (*swapNameProgram) (pid, "listener", &name[0]);
180
181     for (;;) {
182         /* See if a check for additional packets was issued */
183         rx_CheckPackets();
184
185         /* Grab a new packet only if necessary (otherwise re-use the old one) */
186         if (p) {
187             rxi_RestoreDataBufs(p);
188         } else {
189             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
190                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
191         }
192         /* Wait for the next event time or a packet to arrive. */
193         /* event_RaiseEvents schedules any events whose time has come and
194          * then atomically computes the time to the next event, guaranteeing
195          * that this is positive.  If there is no next event, it returns 0 */
196         clock_NewTime();
197         if (!rxevent_RaiseEvents(&cv))
198             tvp = NULL;
199         else {
200             /* It's important to copy cv to tv, because the 4.3 documentation
201              * for select threatens that *tv may be updated after a select, in
202              * future editions of the system, to indicate how much of the time
203              * period has elapsed.  So we shouldn't rely on tv not being altered. */
204             tv.tv_sec = cv.sec; /* Time to next event */
205             tv.tv_usec = cv.usec;
206             tvp = &tv;
207         }
208         if (rx_stats_active)
209             rx_atomic_inc(&rx_stats.selects);
210
211         *rfds = rx_selectMask;
212
213         if (lastPollWorked || nextPollTime < clock_Sec()) {
214             /* we're catching up, or haven't tried to for a few seconds */
215             doingPoll = 1;
216             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
217             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
218             tvp = &tv;
219             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
220         } else {
221             doingPoll = 0;
222             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
223         }
224         lastPollWorked = 0;     /* default is that it didn't find anything */
225
226         if (quitListening) {
227             quitListening = 0;
228             LWP_DestroyProcess(pid);
229         }
230
231         switch (code) {
232         case 0:
233             /* Timer interrupt:
234              * If it was a timer interrupt then we can assume that
235              * the time has advanced by roughly the value of the
236              * previous timeout, and that there is now at least
237              * one pending event.
238              */
239             clock_NewTime();
240             break;
241         case -1:
242             /* select or IOMGR_Select returned failure */
243             debugSelectFailure++;       /* update debugging counter */
244             clock_NewTime();
245             break;
246         case -2:
247             /* IOMGR_Cancel:
248              * IOMGR_Cancel is invoked whenever a new event is
249              * posted that is earlier than any existing events.
250              * So we re-evaluate the time, and then go back to
251              * reschedule events
252              */
253             clock_NewTime();
254             break;
255
256         default:
257             /* Packets have arrived, presumably:
258              * If it wasn't a timer interrupt, then no event should have
259              * timed out yet (well some event may have, but only just...), so
260              * we don't bother looking to see if any have timed out, but just
261              * go directly to reading the data packets
262              */
263             clock_NewTime();
264             if (doingPoll)
265                 lastPollWorked = 1;
266 #ifdef AFS_NT40_ENV
267             for (i = 0; p && i < rfds->fd_count; i++) {
268                 socket = rfds->fd_array[i];
269                 if (rxi_ReadPacket(socket, p, &host, &port)) {
270                     *newcallp = NULL;
271                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
272                                           newcallp);
273                     if (newcallp && *newcallp) {
274                         if (p) {
275                             rxi_FreePacket(p);
276                         }
277                         if (swapNameProgram) {
278                             (*swapNameProgram) (rx_listenerPid, name, 0);
279                             rx_listenerPid = 0;
280                         }
281                         return;
282                     }
283                 }
284             }
285 #else
286             for (socket = rx_minSocketNumber;
287                  p && socket <= rx_maxSocketNumber; socket++) {
288                 if (!FD_ISSET(socket, rfds))
289                     continue;
290                 if (rxi_ReadPacket(socket, p, &host, &port)) {
291                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
292                                           newcallp);
293                     if (newcallp && *newcallp) {
294                         if (p) {
295                             rxi_FreePacket(p);
296                         }
297                         if (swapNameProgram) {
298                             (*swapNameProgram) (rx_listenerPid, name, 0);
299                             rx_listenerPid = 0;
300                         }
301                         return;
302                     }
303                 }
304             }
305 #endif
306             break;
307         }
308     }
309     /* NOTREACHED */
310 }
311
312 /* This is the listener process request loop. The listener process loop
313  * becomes a server thread when rxi_ListenerProc returns, and stays
314  * server thread until rxi_ServerProc returns. */
315 static void *
316 rx_ListenerProc(void *dummy)
317 {
318     int threadID;
319     osi_socket sock;
320     struct rx_call *newcall;
321     fd_set *rfds;
322
323     if (!(rfds = IOMGR_AllocFDSet())) {
324         osi_Panic("rx_ListenerProc: no fd_sets!\n");
325     }
326
327     while (1) {
328         newcall = NULL;
329         threadID = -1;
330         rxi_ListenerProc(rfds, &threadID, &newcall);
331         /* osi_Assert(threadID != -1); */
332         /* osi_Assert(newcall != NULL); */
333         sock = OSI_NULLSOCKET;
334         rxi_ServerProc(threadID, newcall, &sock);
335         /* osi_Assert(sock != OSI_NULLSOCKET); */
336     }
337     /* not reached */
338     return NULL;
339 }
340
341 /* This is the server process request loop. The server process loop
342  * becomes a listener thread when rxi_ServerProc returns, and stays
343  * listener thread until rxi_ListenerProc returns. */
344 void *
345 rx_ServerProc(void * unused)
346 {
347     osi_socket sock;
348     int threadID;
349     struct rx_call *newcall = NULL;
350     fd_set *rfds;
351
352     if (!(rfds = IOMGR_AllocFDSet())) {
353         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
354     }
355
356     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
357     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
358     /* threadID is used for making decisions in GetCall.  Get it by bumping
359      * number of threads handling incoming calls */
360     threadID = rxi_availProcs++;
361
362     while (1) {
363         sock = OSI_NULLSOCKET;
364         rxi_ServerProc(threadID, newcall, &sock);
365         /* osi_Assert(sock != OSI_NULLSOCKET); */
366         newcall = NULL;
367         rxi_ListenerProc(rfds, &threadID, &newcall);
368         /* osi_Assert(threadID != -1); */
369         /* osi_Assert(newcall != NULL); */
370     }
371     /* not reached */
372     return NULL;
373 }
374
375 /*
376  * Called from GetUDPSocket.
377  * Called from a single thread at startup.
378  * Returns 0 on success; -1 on failure.
379  */
380 int
381 rxi_Listen(osi_socket sock)
382 {
383 #ifndef AFS_NT40_ENV
384     /*
385      * Put the socket into non-blocking mode so that rx_Listener
386      * can do a polling read before entering select
387      */
388     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
389         perror("fcntl");
390         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
391         return -1;
392     }
393
394     if (sock > FD_SETSIZE - 1) {
395         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
396          FD_SETSIZE - 1);
397         return -1;
398     }
399 #endif
400
401     FD_SET(sock, &rx_selectMask);
402     if (sock > rx_maxSocketNumber)
403         rx_maxSocketNumber = sock;
404     if (sock < rx_minSocketNumber)
405         rx_minSocketNumber = sock;
406     return 0;
407 }
408
409 /*
410  * Recvmsg
411  */
412 int
413 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
414 {
415     int code;
416     code = recvmsg(socket, msg_p, flags);
417
418 #ifdef AFS_RXERRQ_ENV
419     if (code < 0) {
420         while((rxi_HandleSocketError(socket)) > 0)
421             ;
422     }
423 #endif
424
425     return code;
426 }
427
428 /*
429  * Simulate a blocking sendmsg on the non-blocking socket.
430  * It's non blocking because it was set that way for recvmsg.
431  */
432 int
433 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
434 {
435     fd_set *sfds = (fd_set *) 0;
436     while (sendmsg(socket, msg_p, flags) == -1) {
437         int err;
438
439 #ifdef AFS_NT40_ENV
440         err = WSAGetLastError();
441 #else
442         err = errno;
443 #endif
444
445         if (rx_stats_active)
446             rx_atomic_inc(&rx_stats.sendSelects);
447
448         if (!sfds) {
449             if (!(sfds = IOMGR_AllocFDSet())) {
450                 (osi_Msg "rx failed to alloc fd_set: ");
451                 perror("rx_sendmsg");
452                 return -1;
453             }
454             FD_SET(socket, sfds);
455         }
456 #ifdef AFS_RXERRQ_ENV
457         while((rxi_HandleSocketError(socket)) > 0)
458           ;
459 #endif
460 #ifdef AFS_NT40_ENV
461         if (err)
462 #elif defined(AFS_LINUX22_ENV)
463         /* linux unfortunately returns ECONNREFUSED if the target port
464          * is no longer in use */
465         /* and EAGAIN if a UDP checksum is incorrect */
466         if (err != EWOULDBLOCK && err != ENOBUFS && err != ECONNREFUSED
467             && err != EAGAIN)
468 #else
469         if (err != EWOULDBLOCK && err != ENOBUFS)
470 #endif
471         {
472             (osi_Msg "rx failed to send packet: ");
473             perror("rx_sendmsg");
474             if (err > 0)
475               return -err;
476             return -1;
477         }
478         while ((err = select(
479 #ifdef AFS_NT40_ENV
480                              0,
481 #else
482                              socket + 1,
483 #endif
484                              0, sfds, 0, 0)) != 1) {
485             if (err >= 0 || errno != EINTR)
486                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
487             FD_ZERO(sfds);
488             FD_SET(socket, sfds);
489         }
490     }
491     if (sfds)
492         IOMGR_FreeFDSet(sfds);
493     return 0;
494 }
495 #endif