2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
12 /* This controls the size of an fd_set; it must be defined early before
13 * the system headers define that type and the macros that operate on it.
14 * Its value should be as large as the maximum file descriptor limit we
15 * are likely to run into on any platform. Right now, that is 65536
16 * which is the default hard fd limit on Solaris 9 */
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
27 # include <sys/types.h> /* fd_set on older platforms */
31 # include <winsock2.h>
33 # include <unistd.h> /* select() prototype */
34 # include <sys/time.h> /* struct timeval, select() prototype */
36 # include <sys/select.h> /* fd_set on newer platforms */
38 # include <sys/socket.h>
39 # include <sys/file.h>
41 # include <sys/stat.h>
42 # include <netinet/in.h>
44 # include <sys/ioctl.h>
45 # include <sys/time.h>
47 # include "rx_internal.h"
49 # include "rx_globals.h"
52 #define MAXTHREADNAMELENGTH 64
54 int debugSelectFailure; /* # of times select failed */
57 * Sleep on the unique wait channel provided.
62 LWP_WaitProcess(addr);
66 * Wakeup any threads on the channel provided.
67 * They may be woken up spuriously, and must check any conditions.
70 rxi_Wakeup(void *addr)
72 LWP_NoYieldSignal(addr);
75 PROCESS rx_listenerPid = 0; /* LWP process id of socket listener process */
76 static void* rx_ListenerProc(void *dummy);
79 * Delay the current thread the specified number of seconds.
87 static int quitListening = 0;
89 /* This routine will kill the listener thread, if it exists. */
91 rxi_StopListener(void)
94 rxi_ReScheduleEvents();
97 /* This routine will get called by the event package whenever a new,
98 earlier than others, event is posted. If the Listener process
99 is blocked in selects, this will unblock it. It also can be called
100 to force a new trip through the rxi_Listener select loop when the set
101 of file descriptors it should be listening to changes... */
103 rxi_ReScheduleEvents(void)
106 IOMGR_Cancel(rx_listenerPid);
110 rxi_InitializeThreadSupport(void)
114 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
116 FD_ZERO(&rx_selectMask);
120 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
123 static int number = 0;
126 sprintf(name, "srv_%d", ++number);
127 LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
128 "rx_ServerProc", &scratchPid);
130 (*registerProgram) (scratchPid, name);
134 rxi_StartListener(void)
136 /* Priority of listener should be high, so it can keep conns alive */
137 #define RX_LIST_STACK 24000
138 LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
139 NULL, "rx_Listener", &rx_listenerPid);
141 (*registerProgram) (rx_listenerPid, "listener");
144 /* The main loop which listens to the net for datagrams, and handles timeouts
145 and retransmissions, etc. It also is responsible for scheduling the
146 execution of pending events (in conjunction with event.c).
148 Note interaction of nextPollTime and lastPollWorked. The idea is that if rx is not
149 keeping up with the incoming stream of packets (because there are threads that
150 are interfering with its running sufficiently often), rx does a polling select
151 before doing a real IOMGR_Select system call. Doing a real select means that
152 we don't have to let other processes run before processing more packets.
154 So, our algorithm is that if the last poll on the file descriptor found useful data, or
155 we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
156 then we try the polling select before the IOMGR_Select. If we eventually catch up
157 (which we can tell by the polling select returning no input packets ready), then we
158 don't do a polling select again until several seconds later (via nextPollTime mechanism).
162 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
166 register struct rx_packet *p = (struct rx_packet *)0;
169 afs_int32 nextPollTime; /* time to next poll FD before sleeping */
170 int lastPollWorked, doingPoll; /* true iff last poll was useful */
171 struct timeval tv, *tvp;
177 char name[MAXTHREADNAMELENGTH] = "srv_0";
182 code = LWP_CurrentProcess(&pid);
184 fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
187 rx_listenerPid = pid;
189 (*swapNameProgram) (pid, "listener", &name[0]);
192 /* Grab a new packet only if necessary (otherwise re-use the old one) */
194 rxi_RestoreDataBufs(p);
196 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
197 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
199 /* Wait for the next event time or a packet to arrive. */
200 /* event_RaiseEvents schedules any events whose time has come and
201 * then atomically computes the time to the next event, guaranteeing
202 * that this is positive. If there is no next event, it returns 0 */
204 if (!rxevent_RaiseEvents(&cv))
207 /* It's important to copy cv to tv, because the 4.3 documentation
208 * for select threatens that *tv may be updated after a select, in
209 * future editions of the system, to indicate how much of the time
210 * period has elapsed. So we shouldn't rely on tv not being altered. */
211 tv.tv_sec = cv.sec; /* Time to next event */
212 tv.tv_usec = cv.usec;
217 *rfds = rx_selectMask;
219 if (lastPollWorked || nextPollTime < clock_Sec()) {
220 /* we're catching up, or haven't tried to for a few seconds */
222 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no matter what */
223 tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
225 code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
228 code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
230 lastPollWorked = 0; /* default is that it didn't find anything */
234 LWP_DestroyProcess(pid);
240 * If it was a timer interrupt then we can assume that
241 * the time has advanced by roughly the value of the
242 * previous timeout, and that there is now at least
248 /* select or IOMGR_Select returned failure */
249 debugSelectFailure++; /* update debugging counter */
254 * IOMGR_Cancel is invoked whenever a new event is
255 * posted that is earlier than any existing events.
256 * So we re-evaluate the time, and then go back to
263 /* Packets have arrived, presumably:
264 * If it wasn't a timer interrupt, then no event should have
265 * timed out yet (well some event may have, but only just...), so
266 * we don't bother looking to see if any have timed out, but just
267 * go directly to reading the data packets
273 for (i = 0; p && i < rfds->fd_count; i++) {
274 socket = rfds->fd_array[i];
275 if (rxi_ReadPacket(socket, p, &host, &port)) {
277 p = rxi_ReceivePacket(p, socket, host, port, tnop,
279 if (newcallp && *newcallp) {
283 if (swapNameProgram) {
284 (*swapNameProgram) (rx_listenerPid, name, 0);
292 for (socket = rx_minSocketNumber;
293 p && socket <= rx_maxSocketNumber; socket++) {
294 if (!FD_ISSET(socket, rfds))
296 if (rxi_ReadPacket(socket, p, &host, &port)) {
297 p = rxi_ReceivePacket(p, socket, host, port, tnop,
299 if (newcallp && *newcallp) {
303 if (swapNameProgram) {
304 (*swapNameProgram) (rx_listenerPid, name, 0);
318 /* This is the listener process request loop. The listener process loop
319 * becomes a server thread when rxi_ListenerProc returns, and stays
320 * server thread until rxi_ServerProc returns. */
322 rx_ListenerProc(void *dummy)
326 struct rx_call *newcall;
329 if (!(rfds = IOMGR_AllocFDSet())) {
330 osi_Panic("rx_ListenerProc: no fd_sets!\n");
336 rxi_ListenerProc(rfds, &threadID, &newcall);
337 /* assert(threadID != -1); */
338 /* assert(newcall != NULL); */
339 sock = OSI_NULLSOCKET;
340 rxi_ServerProc(threadID, newcall, &sock);
341 /* assert(sock != OSI_NULLSOCKET); */
347 /* This is the server process request loop. The server process loop
348 * becomes a listener thread when rxi_ServerProc returns, and stays
349 * listener thread until rxi_ListenerProc returns. */
351 rx_ServerProc(void * unused)
355 struct rx_call *newcall = NULL;
358 if (!(rfds = IOMGR_AllocFDSet())) {
359 osi_Panic("rxi_ListenerProc: no fd_sets!\n");
362 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
363 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
364 /* threadID is used for making decisions in GetCall. Get it by bumping
365 * number of threads handling incoming calls */
366 threadID = rxi_availProcs++;
369 sock = OSI_NULLSOCKET;
370 rxi_ServerProc(threadID, newcall, &sock);
371 /* assert(sock != OSI_NULLSOCKET); */
373 rxi_ListenerProc(rfds, &threadID, &newcall);
374 /* assert(threadID != -1); */
375 /* assert(newcall != NULL); */
382 * Called from GetUDPSocket.
383 * Called from a single thread at startup.
384 * Returns 0 on success; -1 on failure.
387 rxi_Listen(osi_socket sock)
391 * Put the socket into non-blocking mode so that rx_Listener
392 * can do a polling read before entering select
394 if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
396 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
400 if (sock > FD_SETSIZE - 1) {
401 (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
407 FD_SET(sock, &rx_selectMask);
408 if (sock > rx_maxSocketNumber)
409 rx_maxSocketNumber = sock;
410 if (sock < rx_minSocketNumber)
411 rx_minSocketNumber = sock;
419 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
421 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
422 while((rxi_HandleSocketError(socket)) > 0)
425 return recvmsg(socket, msg_p, flags);
429 * Simulate a blocking sendmsg on the non-blocking socket.
430 * It's non blocking because it was set that way for recvmsg.
433 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
435 fd_set *sfds = (fd_set *) 0;
436 while (sendmsg(socket, msg_p, flags) == -1) {
438 rx_stats.sendSelects++;
441 if (!(sfds = IOMGR_AllocFDSet())) {
442 (osi_Msg "rx failed to alloc fd_set: ");
443 perror("rx_sendmsg");
446 FD_SET(socket, sfds);
448 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
449 while((rxi_HandleSocketError(socket)) > 0)
453 if (WSAGetLastError())
454 #elif defined(AFS_LINUX22_ENV)
455 /* linux unfortunately returns ECONNREFUSED if the target port
456 * is no longer in use */
457 /* and EAGAIN if a UDP checksum is incorrect */
458 if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
461 if (errno != EWOULDBLOCK && errno != ENOBUFS)
464 (osi_Msg "rx failed to send packet: ");
465 perror("rx_sendmsg");
468 while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
469 if (err >= 0 || errno != EINTR)
470 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
474 IOMGR_FreeFDSet(sfds);