2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
12 /* This controls the size of an fd_set; it must be defined early before
13 * the system headers define that type and the macros that operate on it.
14 * Its value should be as large as the maximum file descriptor limit we
15 * are likely to run into on any platform. Right now, that is 65536
16 * which is the default hard fd limit on Solaris 9 */
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
26 #ifdef HAVE_SYS_FILE_H
27 # include <sys/file.h>
33 #include "rx_atomic.h"
34 #include "rx_globals.h"
37 #define MAXTHREADNAMELENGTH 64
39 int debugSelectFailure; /* # of times select failed */
42 * Sleep on the unique wait channel provided.
47 LWP_WaitProcess(addr);
51 * Wakeup any threads on the channel provided.
52 * They may be woken up spuriously, and must check any conditions.
55 rxi_Wakeup(void *addr)
57 LWP_NoYieldSignal(addr);
60 PROCESS rx_listenerPid = 0; /* LWP process id of socket listener process */
61 static void* rx_ListenerProc(void *dummy);
64 * Delay the current thread the specified number of seconds.
72 static int quitListening = 0;
74 /* This routine will kill the listener thread, if it exists. */
76 rxi_StopListener(void)
79 rxi_ReScheduleEvents();
82 /* This routine will get called by the event package whenever a new,
83 earlier than others, event is posted. If the Listener process
84 is blocked in selects, this will unblock it. It also can be called
85 to force a new trip through the rxi_Listener select loop when the set
86 of file descriptors it should be listening to changes... */
88 rxi_ReScheduleEvents(void)
91 IOMGR_Cancel(rx_listenerPid);
95 rxi_InitializeThreadSupport(void)
99 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
101 FD_ZERO(&rx_selectMask);
105 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
108 static int number = 0;
111 sprintf(name, "srv_%d", ++number);
112 LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
113 "rx_ServerProc", &scratchPid);
115 (*registerProgram) (scratchPid, name);
119 rxi_StartListener(void)
121 /* Priority of listener should be high, so it can keep conns alive */
122 #define RX_LIST_STACK 24000
123 LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
124 NULL, "rx_Listener", &rx_listenerPid);
126 (*registerProgram) (rx_listenerPid, "listener");
129 /* The main loop which listens to the net for datagrams, and handles timeouts
130 and retransmissions, etc. It also is responsible for scheduling the
131 execution of pending events (in conjunction with event.c).
133 Note interaction of nextPollTime and lastPollWorked. The idea is that if rx is not
134 keeping up with the incoming stream of packets (because there are threads that
135 are interfering with its running sufficiently often), rx does a polling select
136 before doing a real IOMGR_Select system call. Doing a real select means that
137 we don't have to let other processes run before processing more packets.
139 So, our algorithm is that if the last poll on the file descriptor found useful data, or
140 we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
141 then we try the polling select before the IOMGR_Select. If we eventually catch up
142 (which we can tell by the polling select returning no input packets ready), then we
143 don't do a polling select again until several seconds later (via nextPollTime mechanism).
147 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
151 struct rx_packet *p = (struct rx_packet *)0;
154 afs_int32 nextPollTime; /* time to next poll FD before sleeping */
155 int lastPollWorked, doingPoll; /* true iff last poll was useful */
156 struct timeval tv, *tvp;
162 char name[MAXTHREADNAMELENGTH] = "srv_0";
167 code = LWP_CurrentProcess(&pid);
169 osi_Panic("rxi_Listener: Can't get my pid.\n");
171 rx_listenerPid = pid;
173 (*swapNameProgram) (pid, "listener", &name[0]);
176 /* See if a check for additional packets was issued */
179 /* Grab a new packet only if necessary (otherwise re-use the old one) */
181 rxi_RestoreDataBufs(p);
183 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
184 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
186 /* Wait for the next event time or a packet to arrive. */
187 /* event_RaiseEvents schedules any events whose time has come and
188 * then atomically computes the time to the next event, guaranteeing
189 * that this is positive. If there is no next event, it returns 0 */
191 if (!rxevent_RaiseEvents(&cv))
194 /* It's important to copy cv to tv, because the 4.3 documentation
195 * for select threatens that *tv may be updated after a select, in
196 * future editions of the system, to indicate how much of the time
197 * period has elapsed. So we shouldn't rely on tv not being altered. */
198 tv.tv_sec = cv.sec; /* Time to next event */
199 tv.tv_usec = cv.usec;
203 rx_atomic_inc(&rx_stats.selects);
205 *rfds = rx_selectMask;
207 if (lastPollWorked || nextPollTime < clock_Sec()) {
208 /* we're catching up, or haven't tried to for a few seconds */
210 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no matter what */
211 tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
213 code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
216 code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
218 lastPollWorked = 0; /* default is that it didn't find anything */
222 LWP_DestroyProcess(pid);
228 * If it was a timer interrupt then we can assume that
229 * the time has advanced by roughly the value of the
230 * previous timeout, and that there is now at least
236 /* select or IOMGR_Select returned failure */
237 debugSelectFailure++; /* update debugging counter */
242 * IOMGR_Cancel is invoked whenever a new event is
243 * posted that is earlier than any existing events.
244 * So we re-evaluate the time, and then go back to
251 /* Packets have arrived, presumably:
252 * If it wasn't a timer interrupt, then no event should have
253 * timed out yet (well some event may have, but only just...), so
254 * we don't bother looking to see if any have timed out, but just
255 * go directly to reading the data packets
261 for (i = 0; p && i < rfds->fd_count; i++) {
262 socket = rfds->fd_array[i];
263 if (rxi_ReadPacket(socket, p, &host, &port)) {
265 p = rxi_ReceivePacket(p, socket, host, port, tnop,
267 if (newcallp && *newcallp) {
271 if (swapNameProgram) {
272 (*swapNameProgram) (rx_listenerPid, name, 0);
280 for (socket = rx_minSocketNumber;
281 p && socket <= rx_maxSocketNumber; socket++) {
282 if (!FD_ISSET(socket, rfds))
284 if (rxi_ReadPacket(socket, p, &host, &port)) {
285 p = rxi_ReceivePacket(p, socket, host, port, tnop,
287 if (newcallp && *newcallp) {
291 if (swapNameProgram) {
292 (*swapNameProgram) (rx_listenerPid, name, 0);
306 /* This is the listener process request loop. The listener process loop
307 * becomes a server thread when rxi_ListenerProc returns, and stays
308 * server thread until rxi_ServerProc returns. */
310 rx_ListenerProc(void *dummy)
314 struct rx_call *newcall;
317 if (!(rfds = IOMGR_AllocFDSet())) {
318 osi_Panic("rx_ListenerProc: no fd_sets!\n");
324 rxi_ListenerProc(rfds, &threadID, &newcall);
325 /* osi_Assert(threadID != -1); */
326 /* osi_Assert(newcall != NULL); */
327 sock = OSI_NULLSOCKET;
328 rxi_ServerProc(threadID, newcall, &sock);
329 /* osi_Assert(sock != OSI_NULLSOCKET); */
335 /* This is the server process request loop. The server process loop
336 * becomes a listener thread when rxi_ServerProc returns, and stays
337 * listener thread until rxi_ListenerProc returns. */
339 rx_ServerProc(void * unused)
343 struct rx_call *newcall = NULL;
346 if (!(rfds = IOMGR_AllocFDSet())) {
347 osi_Panic("rxi_ListenerProc: no fd_sets!\n");
350 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
351 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
352 /* threadID is used for making decisions in GetCall. Get it by bumping
353 * number of threads handling incoming calls */
354 threadID = rxi_availProcs++;
357 sock = OSI_NULLSOCKET;
358 rxi_ServerProc(threadID, newcall, &sock);
359 /* osi_Assert(sock != OSI_NULLSOCKET); */
361 rxi_ListenerProc(rfds, &threadID, &newcall);
362 /* osi_Assert(threadID != -1); */
363 /* osi_Assert(newcall != NULL); */
370 * Called from GetUDPSocket.
371 * Called from a single thread at startup.
372 * Returns 0 on success; -1 on failure.
375 rxi_Listen(osi_socket sock)
379 * Put the socket into non-blocking mode so that rx_Listener
380 * can do a polling read before entering select
382 if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
384 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
388 if (sock > FD_SETSIZE - 1) {
389 (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
395 FD_SET(sock, &rx_selectMask);
396 if (sock > rx_maxSocketNumber)
397 rx_maxSocketNumber = sock;
398 if (sock < rx_minSocketNumber)
399 rx_minSocketNumber = sock;
407 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
409 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
410 while((rxi_HandleSocketError(socket)) > 0)
413 return recvmsg(socket, msg_p, flags);
417 * Simulate a blocking sendmsg on the non-blocking socket.
418 * It's non blocking because it was set that way for recvmsg.
421 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
423 fd_set *sfds = (fd_set *) 0;
424 while (sendmsg(socket, msg_p, flags) == -1) {
427 rx_atomic_inc(&rx_stats.sendSelects);
430 if (!(sfds = IOMGR_AllocFDSet())) {
431 (osi_Msg "rx failed to alloc fd_set: ");
432 perror("rx_sendmsg");
435 FD_SET(socket, sfds);
437 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
438 while((rxi_HandleSocketError(socket)) > 0)
442 if (WSAGetLastError())
443 #elif defined(AFS_LINUX22_ENV)
444 /* linux unfortunately returns ECONNREFUSED if the target port
445 * is no longer in use */
446 /* and EAGAIN if a UDP checksum is incorrect */
447 if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
450 if (errno != EWOULDBLOCK && errno != ENOBUFS)
453 (osi_Msg "rx failed to send packet: ");
454 perror("rx_sendmsg");
459 if (WSAGetLastError() > 0)
460 return -WSAGetLastError();
464 while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
465 if (err >= 0 || errno != EINTR)
466 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
468 FD_SET(socket, sfds);
472 IOMGR_FreeFDSet(sfds);