2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
12 # include <afs/param.h>
13 # include <sys/types.h> /* fd_set on older platforms */
17 # include <winsock2.h>
19 # include <unistd.h> /* select() prototype */
20 # include <sys/time.h> /* struct timeval, select() prototype */
22 # include <sys/select.h> /* fd_set on newer platforms */
24 # include <sys/socket.h>
25 # include <sys/file.h>
27 # include <sys/stat.h>
28 # include <netinet/in.h>
30 # include <sys/ioctl.h>
31 # include <sys/time.h>
34 # include "rx_globals.h"
38 int debugSelectFailure; /* # of times select failed */
40 * Sleep on the unique wait channel provided.
42 void rxi_Sleep(void *addr)
44 LWP_WaitProcess(addr);
48 * Wakeup any threads on the channel provided.
49 * They may be woken up spuriously, and must check any conditions.
51 void rxi_Wakeup(void *addr)
53 LWP_NoYieldSignal(addr);
56 PROCESS rx_listenerPid; /* LWP process id of socket listener process */
57 static void rx_ListenerProc(void *dummy);
60 * Delay the current thread the specified number of seconds.
62 void rxi_Delay(int sec)
67 /* This routine will get called by the event package whenever a new,
68 earlier than others, event is posted. If the Listener process
69 is blocked in selects, this will unblock it. It also can be called
70 to force a new trip through the rxi_Listener select loop when the set
71 of file descriptors it should be listening to changes... */
72 void rxi_ReScheduleEvents() {
73 if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
76 void rxi_InitializeThreadSupport(void)
80 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
82 FD_ZERO(&rx_selectMask);
85 void rxi_StartServerProc(proc, stacksize)
89 LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
90 0, "rx_ServerProc", &scratchPid);
93 void rxi_StartListener() {
94 /* Priority of listener should be high, so it can keep conns alive */
95 #define RX_LIST_STACK 24000
96 LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
97 "rx_Listener", &rx_listenerPid);
100 /* The main loop which listens to the net for datagrams, and handles timeouts
101 and retransmissions, etc. It also is responsible for scheduling the
102 execution of pending events (in conjunction with event.c).
104 Note interaction of nextPollTime and lastPollWorked. The idea is that if rx is not
105 keeping up with the incoming stream of packets (because there are threads that
106 are interfering with its running sufficiently often), rx does a polling select
107 before doing a real IOMGR_Select system call. Doing a real select means that
108 we don't have to let other processes run before processing more packets.
110 So, our algorithm is that if the last poll on the file descriptor found useful data, or
111 we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
112 then we try the polling select before the IOMGR_Select. If we eventually catch up
113 (which we can tell by the polling select returning no input packets ready), then we
114 don't do a polling select again until several seconds later (via nextPollTime mechanism).
117 void rxi_ListenerProc(rfds, tnop, newcallp)
120 struct rx_call **newcallp;
124 register struct rx_packet *p = (struct rx_packet *)0;
127 afs_int32 nextPollTime; /* time to next poll FD before sleeping */
128 int lastPollWorked, doingPoll; /* true iff last poll was useful */
129 struct timeval tv, *tvp;
137 /* Grab a new packet only if necessary (otherwise re-use the old one) */
139 rxi_RestoreDataBufs(p);
142 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
143 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
145 /* Wait for the next event time or a packet to arrive. */
146 /* event_RaiseEvents schedules any events whose time has come and
147 then atomically computes the time to the next event, guaranteeing
148 that this is positive. If there is no next event, it returns 0 */
150 if (!rxevent_RaiseEvents(&cv)) tvp = (struct timeval *) 0;
152 /* It's important to copy cv to tv, because the 4.3 documentation
153 for select threatens that *tv may be updated after a select, in
154 future editions of the system, to indicate how much of the time
155 period has elapsed. So we shouldn't rely on tv not being altered. */
156 tv.tv_sec = cv.sec; /* Time to next event */
157 tv.tv_usec = cv.usec;
162 *rfds = rx_selectMask;
164 if (lastPollWorked || nextPollTime < clock_Sec()) {
165 /* we're catching up, or haven't tried to for a few seconds */
167 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no matter what */
168 tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
170 code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
174 code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
176 lastPollWorked = 0; /* default is that it didn't find anything */
181 * If it was a timer interrupt then we can assume that
182 * the time has advanced by roughly the value of the
183 * previous timeout, and that there is now at least
189 /* select or IOMGR_Select returned failure */
190 debugSelectFailure++; /* update debugging counter */
195 * IOMGR_Cancel is invoked whenever a new event is
196 * posted that is earlier than any existing events.
197 * So we re-evaluate the time, and then go back to
204 /* Packets have arrived, presumably:
205 * If it wasn't a timer interrupt, then no event should have
206 * timed out yet (well some event may have, but only just...), so
207 * we don't bother looking to see if any have timed out, but just
208 * go directly to reading the data packets
211 if (doingPoll) lastPollWorked = 1;
213 for (i=0; p && i<rfds->fd_count; i++) {
214 socket = rfds->fd_array[i];
215 if (rxi_ReadPacket(socket, p, &host, &port)) {
217 p = rxi_ReceivePacket(p, socket, host, port,
219 if (newcallp && *newcallp) {
228 for (socket = rx_minSocketNumber;
229 p && socket <= rx_maxSocketNumber; socket++) {
230 if (!FD_ISSET(socket, rfds))
232 if (rxi_ReadPacket(socket, p, &host, &port)) {
233 p = rxi_ReceivePacket(p, socket, host, port,
235 if (newcallp && *newcallp) {
250 /* This is the listener process request loop. The listener process loop
251 * becomes a server thread when rxi_ListenerProc returns, and stays
252 * server thread until rxi_ServerProc returns. */
253 static void rx_ListenerProc(void *dummy)
257 struct rx_call *newcall;
260 if (!(rfds = IOMGR_AllocFDSet())) {
261 osi_Panic("rx_ListenerProc: no fd_sets!\n");
267 rxi_ListenerProc(rfds, &threadID, &newcall);
268 /* assert(threadID != -1); */
269 /* assert(newcall != NULL); */
270 sock = OSI_NULLSOCKET;
271 rxi_ServerProc(threadID, newcall, &sock);
272 /* assert(sock != OSI_NULLSOCKET); */
277 /* This is the server process request loop. The server process loop
278 * becomes a listener thread when rxi_ServerProc returns, and stays
279 * listener thread until rxi_ListenerProc returns. */
284 struct rx_call *newcall = NULL;
287 if (!(rfds = IOMGR_AllocFDSet())) {
288 osi_Panic("rxi_ListenerProc: no fd_sets!\n");
291 rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
292 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
293 /* threadID is used for making decisions in GetCall. Get it by bumping
294 * number of threads handling incoming calls */
295 threadID = rxi_availProcs++;
298 sock = OSI_NULLSOCKET;
299 rxi_ServerProc(threadID, newcall, &sock);
300 /* assert(sock != OSI_NULLSOCKET); */
302 rxi_ListenerProc(rfds, &threadID, &newcall);
303 /* assert(threadID != -1); */
304 /* assert(newcall != NULL); */
310 * Called from GetUDPSocket.
311 * Called from a single thread at startup.
312 * Returns 0 on success; -1 on failure.
319 * Put the socket into non-blocking mode so that rx_Listener
320 * can do a polling read before entering select
322 if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
324 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
328 if (sock > FD_SETSIZE-1) {
329 (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
335 FD_SET(sock, &rx_selectMask);
336 if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
337 if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
344 int rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
346 return recvmsg((int) socket, msg_p, flags);
350 * Simulate a blocking sendmsg on the non-blocking socket.
351 * It's non blocking because it was set that way for recvmsg.
353 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
355 fd_set *sfds = (fd_set*)0;
356 while (sendmsg(socket, msg_p, flags) == -1) {
358 rx_stats.sendSelects++;
361 if ( !(sfds = IOMGR_AllocFDSet())) {
362 (osi_Msg "rx failed to alloc fd_set: ");
363 perror("rx_sendmsg");
366 FD_SET(socket, sfds);
371 #elif defined(AFS_LINUX22_ENV)
372 /* linux unfortunately returns ECONNREFUSED if the target port
373 * is no longer in use */
374 if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED)
376 if (errno != EWOULDBLOCK && errno != ENOBUFS)
379 (osi_Msg "rx failed to send packet: ");
380 perror("rx_sendmsg");
383 while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
384 if (err >= 0 || errno != EINTR)
385 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
389 IOMGR_FreeFDSet(sfds);