2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
12 #include <afsconfig.h>
13 #include <afs/param.h>
18 # include <sys/types.h> /* fd_set on older platforms */
22 # include <winsock2.h>
24 # include <unistd.h> /* select() prototype */
25 # include <sys/time.h> /* struct timeval, select() prototype */
27 # include <sys/select.h> /* fd_set on newer platforms */
29 # include <sys/socket.h>
30 # include <sys/file.h>
32 # include <sys/stat.h>
33 # include <netinet/in.h>
35 # include <sys/ioctl.h>
36 # include <sys/time.h>
39 # include "rx_globals.h"
42 #define MAXTHREADNAMELENGTH 64
44 extern int (*registerProgram) ();
45 extern int (*swapNameProgram) ();
47 int debugSelectFailure; /* # of times select failed */
50 * Sleep on the unique wait channel provided.
55 LWP_WaitProcess(addr);
59 * Wakeup any threads on the channel provided.
60 * They may be woken up spuriously, and must check any conditions.
63 rxi_Wakeup(void *addr)
65 LWP_NoYieldSignal(addr);
68 PROCESS rx_listenerPid = 0; /* LWP process id of socket listener process */
69 static void rx_ListenerProc(void *dummy);
72 * Delay the current thread the specified number of seconds.
80 static int quitListening = 0;
82 /* This routine will kill the listener thread, if it exists. */
84 rxi_StopListener(void)
87 rxi_ReScheduleEvents();
90 /* This routine will get called by the event package whenever a new,
91 earlier than others, event is posted. If the Listener process
92 is blocked in selects, this will unblock it. It also can be called
93 to force a new trip through the rxi_Listener select loop when the set
94 of file descriptors it should be listening to changes... */
96 rxi_ReScheduleEvents(void)
99 IOMGR_Cancel(rx_listenerPid);
103 rxi_InitializeThreadSupport(void)
107 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
109 FD_ZERO(&rx_selectMask);
113 rxi_StartServerProc(void (*proc) (void), int stacksize)
116 static int number = 0;
119 sprintf(name, "srv_%d", ++number);
120 LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, (void *)0,
121 "rx_ServerProc", &scratchPid);
123 (*registerProgram) (scratchPid, name);
127 rxi_StartListener(void)
129 /* Priority of listener should be high, so it can keep conns alive */
130 #define RX_LIST_STACK 24000
131 LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
132 (void *)0, "rx_Listener", &rx_listenerPid);
134 (*registerProgram) (rx_listenerPid, "listener");
137 /* The main loop which listens to the net for datagrams, and handles timeouts
138 and retransmissions, etc. It also is responsible for scheduling the
139 execution of pending events (in conjunction with event.c).
141 Note interaction of nextPollTime and lastPollWorked. The idea is that if rx is not
142 keeping up with the incoming stream of packets (because there are threads that
143 are interfering with its running sufficiently often), rx does a polling select
144 before doing a real IOMGR_Select system call. Doing a real select means that
145 we don't have to let other processes run before processing more packets.
147 So, our algorithm is that if the last poll on the file descriptor found useful data, or
148 we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
149 then we try the polling select before the IOMGR_Select. If we eventually catch up
150 (which we can tell by the polling select returning no input packets ready), then we
151 don't do a polling select again until several seconds later (via nextPollTime mechanism).
155 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
159 register struct rx_packet *p = (struct rx_packet *)0;
162 afs_int32 nextPollTime; /* time to next poll FD before sleeping */
163 int lastPollWorked, doingPoll; /* true iff last poll was useful */
164 struct timeval tv, *tvp;
170 char name[MAXTHREADNAMELENGTH] = "srv_0";
175 code = LWP_CurrentProcess(&pid);
177 fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
180 rx_listenerPid = pid;
182 (*swapNameProgram) (pid, "listener", &name);
185 /* Grab a new packet only if necessary (otherwise re-use the old one) */
187 rxi_RestoreDataBufs(p);
189 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
190 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
192 /* Wait for the next event time or a packet to arrive. */
193 /* event_RaiseEvents schedules any events whose time has come and
194 * then atomically computes the time to the next event, guaranteeing
195 * that this is positive. If there is no next event, it returns 0 */
197 if (!rxevent_RaiseEvents(&cv))
200 /* It's important to copy cv to tv, because the 4.3 documentation
201 * for select threatens that *tv may be updated after a select, in
202 * future editions of the system, to indicate how much of the time
203 * period has elapsed. So we shouldn't rely on tv not being altered. */
204 tv.tv_sec = cv.sec; /* Time to next event */
205 tv.tv_usec = cv.usec;
210 *rfds = rx_selectMask;
212 if (lastPollWorked || nextPollTime < clock_Sec()) {
213 /* we're catching up, or haven't tried to for a few seconds */
215 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no matter what */
216 tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
218 code = select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
221 code = IOMGR_Select(rx_maxSocketNumber + 1, rfds, 0, 0, tvp);
223 lastPollWorked = 0; /* default is that it didn't find anything */
227 LWP_DestroyProcess(pid);
233 * If it was a timer interrupt then we can assume that
234 * the time has advanced by roughly the value of the
235 * previous timeout, and that there is now at least
241 /* select or IOMGR_Select returned failure */
242 debugSelectFailure++; /* update debugging counter */
247 * IOMGR_Cancel is invoked whenever a new event is
248 * posted that is earlier than any existing events.
249 * So we re-evaluate the time, and then go back to
256 /* Packets have arrived, presumably:
257 * If it wasn't a timer interrupt, then no event should have
258 * timed out yet (well some event may have, but only just...), so
259 * we don't bother looking to see if any have timed out, but just
260 * go directly to reading the data packets
266 for (i = 0; p && i < rfds->fd_count; i++) {
267 socket = rfds->fd_array[i];
268 if (rxi_ReadPacket(socket, p, &host, &port)) {
270 p = rxi_ReceivePacket(p, socket, host, port, tnop,
272 if (newcallp && *newcallp) {
276 if (swapNameProgram) {
277 (*swapNameProgram) (rx_listenerPid, &name, 0);
285 for (socket = rx_minSocketNumber;
286 p && socket <= rx_maxSocketNumber; socket++) {
287 if (!FD_ISSET(socket, rfds))
289 if (rxi_ReadPacket(socket, p, &host, &port)) {
290 p = rxi_ReceivePacket(p, socket, host, port, tnop,
292 if (newcallp && *newcallp) {
296 if (swapNameProgram) {
297 (*swapNameProgram) (rx_listenerPid, &name, 0);
311 /* This is the listener process request loop. The listener process loop
312 * becomes a server thread when rxi_ListenerProc returns, and stays
313 * server thread until rxi_ServerProc returns. */
315 rx_ListenerProc(void *dummy)
319 struct rx_call *newcall;
322 if (!(rfds = IOMGR_AllocFDSet())) {
323 osi_Panic("rx_ListenerProc: no fd_sets!\n");
329 rxi_ListenerProc(rfds, &threadID, &newcall);
330 /* assert(threadID != -1); */
331 /* assert(newcall != NULL); */
332 sock = OSI_NULLSOCKET;
333 rxi_ServerProc(threadID, newcall, &sock);
334 /* assert(sock != OSI_NULLSOCKET); */
339 /* This is the server process request loop. The server process loop
340 * becomes a listener thread when rxi_ServerProc returns, and stays
341 * listener thread until rxi_ListenerProc returns. */
347 struct rx_call *newcall = NULL;
350 if (!(rfds = IOMGR_AllocFDSet())) {
351 osi_Panic("rxi_ListenerProc: no fd_sets!\n");
354 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
355 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
356 /* threadID is used for making decisions in GetCall. Get it by bumping
357 * number of threads handling incoming calls */
358 threadID = rxi_availProcs++;
361 sock = OSI_NULLSOCKET;
362 rxi_ServerProc(threadID, newcall, &sock);
363 /* assert(sock != OSI_NULLSOCKET); */
365 rxi_ListenerProc(rfds, &threadID, &newcall);
366 /* assert(threadID != -1); */
367 /* assert(newcall != NULL); */
373 * Called from GetUDPSocket.
374 * Called from a single thread at startup.
375 * Returns 0 on success; -1 on failure.
378 rxi_Listen(osi_socket sock)
382 * Put the socket into non-blocking mode so that rx_Listener
383 * can do a polling read before entering select
385 #ifndef AFS_DJGPP_ENV
386 if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
388 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
392 if (__djgpp_set_socket_blocking_mode(sock, 1) < 0) {
393 perror("__djgpp_set_socket_blocking_mode");
394 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
397 #endif /* AFS_DJGPP_ENV */
399 if (sock > FD_SETSIZE - 1) {
400 (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
406 FD_SET(sock, &rx_selectMask);
407 if (sock > rx_maxSocketNumber)
408 rx_maxSocketNumber = sock;
409 if (sock < rx_minSocketNumber)
410 rx_minSocketNumber = sock;
418 rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
420 return recvmsg((int)socket, msg_p, flags);
424 * Simulate a blocking sendmsg on the non-blocking socket.
425 * It's non blocking because it was set that way for recvmsg.
428 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
430 fd_set *sfds = (fd_set *) 0;
431 while (sendmsg(socket, msg_p, flags) == -1) {
433 rx_stats.sendSelects++;
436 if (!(sfds = IOMGR_AllocFDSet())) {
437 (osi_Msg "rx failed to alloc fd_set: ");
438 perror("rx_sendmsg");
441 FD_SET(socket, sfds);
445 #elif defined(AFS_LINUX22_ENV)
446 /* linux unfortunately returns ECONNREFUSED if the target port
447 * is no longer in use */
448 /* and EAGAIN if a UDP checksum is incorrect */
449 if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
452 if (errno != EWOULDBLOCK && errno != ENOBUFS)
455 (osi_Msg "rx failed to send packet: ");
456 perror("rx_sendmsg");
459 while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
460 if (err >= 0 || errno != EINTR)
461 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
465 IOMGR_FreeFDSet(sfds);