2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
12 /* This controls the size of an fd_set; it must be defined early before
13 * the system headers define that type and the macros that operate on it.
14 * Its value should be as large as the maximum file descriptor limit we
15 * are likely to run into on any platform. Right now, that is 65536
16 * which is the default hard fd limit on Solaris 9 */
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
28 #include <sys/types.h> /* fd_set on older platforms */
32 # include <winsock2.h>
34 # include <unistd.h> /* select() prototype */
35 # include <sys/time.h> /* struct timeval, select() prototype */
37 # include <sys/select.h> /* fd_set on newer platforms */
39 # include <sys/socket.h>
40 # include <sys/file.h>
42 # include <sys/stat.h>
43 # include <netinet/in.h>
45 # include <sys/ioctl.h>
46 # include <sys/time.h>
50 #include "rx_atomic.h"
51 #include "rx_globals.h"
55 #define MAXTHREADNAMELENGTH 64
57 int debugSelectFailure; /* # of times select failed */
60 * Sleep on the unique wait channel provided.
65 LWP_WaitProcess(addr);
69 * Wakeup any threads on the channel provided.
70 * They may be woken up spuriously, and must check any conditions.
73 rxi_Wakeup(void *addr)
75 LWP_NoYieldSignal(addr);
78 PROCESS rx_listenerPid = 0; /* LWP process id of socket listener process */
79 static void* rx_ListenerProc(void *dummy);
82 * Delay the current thread the specified number of seconds.
90 static int quitListening = 0;
92 /* This routine will kill the listener thread, if it exists. */
94 rxi_StopListener(void)
97 rxi_ReScheduleEvents();
100 /* This routine will get called by the event package whenever a new,
101 earlier than others, event is posted. If the Listener process
102 is blocked in selects, this will unblock it. It also can be called
103 to force a new trip through the rxi_Listener select loop when the set
104 of file descriptors it should be listening to changes... */
106 rxi_ReScheduleEvents(void)
109 IOMGR_Cancel(rx_listenerPid);
113 rxi_InitializeThreadSupport(void)
117 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
119 FD_ZERO(&rx_selectMask);
123 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
126 static int number = 0;
129 sprintf(name, "srv_%d", ++number);
130 LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
131 "rx_ServerProc", &scratchPid);
133 (*registerProgram) (scratchPid, name);
137 rxi_StartListener(void)
139 /* Priority of listener should be high, so it can keep conns alive */
140 #define RX_LIST_STACK 24000
141 LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
142 NULL, "rx_Listener", &rx_listenerPid);
144 (*registerProgram) (rx_listenerPid, "listener");
147 /* The main loop which listens to the net for datagrams, and handles timeouts
148 and retransmissions, etc. It also is responsible for scheduling the
149 execution of pending events (in conjunction with event.c).
151 Note interaction of nextPollTime and lastPollWorked. The idea is that if rx is not
152 keeping up with the incoming stream of packets (because there are threads that
153 are interfering with its running sufficiently often), rx does a polling select
154 before doing a real IOMGR_Select system call. Doing a real select means that
155 we don't have to let other processes run before processing more packets.
157 So, our algorithm is that if the last poll on the file descriptor found useful data, or
158 we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
159 then we try the polling select before the IOMGR_Select. If we eventually catch up
160 (which we can tell by the polling select returning no input packets ready), then we
161 don't do a polling select again until several seconds later (via nextPollTime mechanism).
165 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
169 struct rx_packet *p = (struct rx_packet *)0;
172 afs_int32 nextPollTime; /* time to next poll FD before sleeping */
173 int lastPollWorked, doingPoll; /* true iff last poll was useful */
174 struct timeval tv, *tvp;
180 char name[MAXTHREADNAMELENGTH] = "srv_0";
185 code = LWP_CurrentProcess(&pid);
187 osi_Panic("rxi_Listener: Can't get my pid.\n");
189 rx_listenerPid = pid;
191 (*swapNameProgram) (pid, "listener", &name[0]);
194 /* See if a check for additional packets was issued */
197 /* Grab a new packet only if necessary (otherwise re-use the old one) */
199 rxi_RestoreDataBufs(p);
201 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
202 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
204 /* Wait for the next event time or a packet to arrive. */
205 /* event_RaiseEvents schedules any events whose time has come and
206 * then atomically computes the time to the next event, guaranteeing
207 * that this is positive. If there is no next event, it returns 0 */
209 if (!rxevent_RaiseEvents(&cv))
212 /* It's important to copy cv to tv, because the 4.3 documentation
213 * for select threatens that *tv may be updated after a select, in
214 * future editions of the system, to indicate how much of the time
215 * period has elapsed. So we shouldn't rely on tv not being altered. */
216 tv.tv_sec = cv.sec; /* Time to next event */
217 tv.tv_usec = cv.usec;
221 rx_atomic_inc(&rx_stats.selects);
223 *rfds = rx_selectMask;
225 if (lastPollWorked || nextPollTime < clock_Sec()) {
226 /* we're catching up, or haven't tried to for a few seconds */
228 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no matter what */
229 tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
231 code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
234 code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
236 lastPollWorked = 0; /* default is that it didn't find anything */
240 LWP_DestroyProcess(pid);
246 * If it was a timer interrupt then we can assume that
247 * the time has advanced by roughly the value of the
248 * previous timeout, and that there is now at least
254 /* select or IOMGR_Select returned failure */
255 debugSelectFailure++; /* update debugging counter */
260 * IOMGR_Cancel is invoked whenever a new event is
261 * posted that is earlier than any existing events.
262 * So we re-evaluate the time, and then go back to
269 /* Packets have arrived, presumably:
270 * If it wasn't a timer interrupt, then no event should have
271 * timed out yet (well some event may have, but only just...), so
272 * we don't bother looking to see if any have timed out, but just
273 * go directly to reading the data packets
279 for (i = 0; p && i < rfds->fd_count; i++) {
280 socket = rfds->fd_array[i];
281 if (rxi_ReadPacket(socket, p, &host, &port)) {
283 p = rxi_ReceivePacket(p, socket, host, port, tnop,
285 if (newcallp && *newcallp) {
289 if (swapNameProgram) {
290 (*swapNameProgram) (rx_listenerPid, name, 0);
298 for (socket = rx_minSocketNumber;
299 p && socket <= rx_maxSocketNumber; socket++) {
300 if (!FD_ISSET(socket, rfds))
302 if (rxi_ReadPacket(socket, p, &host, &port)) {
303 p = rxi_ReceivePacket(p, socket, host, port, tnop,
305 if (newcallp && *newcallp) {
309 if (swapNameProgram) {
310 (*swapNameProgram) (rx_listenerPid, name, 0);
324 /* This is the listener process request loop. The listener process loop
325 * becomes a server thread when rxi_ListenerProc returns, and stays
326 * server thread until rxi_ServerProc returns. */
328 rx_ListenerProc(void *dummy)
332 struct rx_call *newcall;
335 if (!(rfds = IOMGR_AllocFDSet())) {
336 osi_Panic("rx_ListenerProc: no fd_sets!\n");
342 rxi_ListenerProc(rfds, &threadID, &newcall);
343 /* osi_Assert(threadID != -1); */
344 /* osi_Assert(newcall != NULL); */
345 sock = OSI_NULLSOCKET;
346 rxi_ServerProc(threadID, newcall, &sock);
347 /* osi_Assert(sock != OSI_NULLSOCKET); */
353 /* This is the server process request loop. The server process loop
354 * becomes a listener thread when rxi_ServerProc returns, and stays
355 * listener thread until rxi_ListenerProc returns. */
357 rx_ServerProc(void * unused)
361 struct rx_call *newcall = NULL;
364 if (!(rfds = IOMGR_AllocFDSet())) {
365 osi_Panic("rxi_ListenerProc: no fd_sets!\n");
368 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
369 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
370 /* threadID is used for making decisions in GetCall. Get it by bumping
371 * number of threads handling incoming calls */
372 threadID = rxi_availProcs++;
375 sock = OSI_NULLSOCKET;
376 rxi_ServerProc(threadID, newcall, &sock);
377 /* osi_Assert(sock != OSI_NULLSOCKET); */
379 rxi_ListenerProc(rfds, &threadID, &newcall);
380 /* osi_Assert(threadID != -1); */
381 /* osi_Assert(newcall != NULL); */
388 * Called from GetUDPSocket.
389 * Called from a single thread at startup.
390 * Returns 0 on success; -1 on failure.
393 rxi_Listen(osi_socket sock)
397 * Put the socket into non-blocking mode so that rx_Listener
398 * can do a polling read before entering select
400 if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
402 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
406 if (sock > FD_SETSIZE - 1) {
407 (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
413 FD_SET(sock, &rx_selectMask);
414 if (sock > rx_maxSocketNumber)
415 rx_maxSocketNumber = sock;
416 if (sock < rx_minSocketNumber)
417 rx_minSocketNumber = sock;
425 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
427 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
428 while((rxi_HandleSocketError(socket)) > 0)
431 return recvmsg(socket, msg_p, flags);
435 * Simulate a blocking sendmsg on the non-blocking socket.
436 * It's non blocking because it was set that way for recvmsg.
439 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
441 fd_set *sfds = (fd_set *) 0;
442 while (sendmsg(socket, msg_p, flags) == -1) {
445 rx_atomic_inc(&rx_stats.sendSelects);
448 if (!(sfds = IOMGR_AllocFDSet())) {
449 (osi_Msg "rx failed to alloc fd_set: ");
450 perror("rx_sendmsg");
453 FD_SET(socket, sfds);
455 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
456 while((rxi_HandleSocketError(socket)) > 0)
460 if (WSAGetLastError())
461 #elif defined(AFS_LINUX22_ENV)
462 /* linux unfortunately returns ECONNREFUSED if the target port
463 * is no longer in use */
464 /* and EAGAIN if a UDP checksum is incorrect */
465 if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
468 if (errno != EWOULDBLOCK && errno != ENOBUFS)
471 (osi_Msg "rx failed to send packet: ");
472 perror("rx_sendmsg");
477 if (WSAGetLastError() > 0)
478 return -WSAGetLastError();
482 while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
483 if (err >= 0 || errno != EINTR)
484 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
486 FD_SET(socket, sfds);
490 IOMGR_FreeFDSet(sfds);