2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
12 #include <afsconfig.h>
13 #include <afs/param.h>
17 # include <sys/types.h> /* fd_set on older platforms */
21 # include <winsock2.h>
23 # include <unistd.h> /* select() prototype */
24 # include <sys/time.h> /* struct timeval, select() prototype */
26 # include <sys/select.h> /* fd_set on newer platforms */
28 # include <sys/socket.h>
29 # include <sys/file.h>
31 # include <sys/stat.h>
32 # include <netinet/in.h>
34 # include <sys/ioctl.h>
35 # include <sys/time.h>
38 # include "rx_globals.h"
41 #define MAXTHREADNAMELENGTH 64
43 extern int (*registerProgram)();
44 extern int (*swapNameProgram)();
46 int debugSelectFailure; /* # of times select failed */
49 * Sleep on the unique wait channel provided.
51 void rxi_Sleep(void *addr)
53 LWP_WaitProcess(addr);
57 * Wakeup any threads on the channel provided.
58 * They may be woken up spuriously, and must check any conditions.
60 void rxi_Wakeup(void *addr)
62 LWP_NoYieldSignal(addr);
65 PROCESS rx_listenerPid = 0; /* LWP process id of socket listener process */
66 static void rx_ListenerProc(void *dummy);
69 * Delay the current thread the specified number of seconds.
71 void rxi_Delay(int sec)
76 static int quitListening = 0;
78 /* This routine will kill the listener thread, if it exists. */
79 void rxi_StopListener(void)
82 rxi_ReScheduleEvents();
85 /* This routine will get called by the event package whenever a new,
86 earlier than others, event is posted. If the Listener process
87 is blocked in selects, this will unblock it. It also can be called
88 to force a new trip through the rxi_Listener select loop when the set
89 of file descriptors it should be listening to changes... */
90 void rxi_ReScheduleEvents(void)
92 if (rx_listenerPid) IOMGR_Cancel(rx_listenerPid);
95 void rxi_InitializeThreadSupport(void)
99 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
101 FD_ZERO(&rx_selectMask);
104 void rxi_StartServerProc(void (*proc)(void), int stacksize)
107 static int number = 0;
110 sprintf(name, "srv_%d", ++number);
111 LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY,
112 0, "rx_ServerProc", &scratchPid);
114 (*registerProgram)(scratchPid, name);
117 void rxi_StartListener(void)
119 /* Priority of listener should be high, so it can keep conns alive */
120 #define RX_LIST_STACK 24000
121 LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
122 "rx_Listener", &rx_listenerPid);
124 (*registerProgram)(rx_listenerPid, "listener");
127 /* The main loop which listens to the net for datagrams, and handles timeouts
128 and retransmissions, etc. It also is responsible for scheduling the
129 execution of pending events (in conjunction with event.c).
131 Note interaction of nextPollTime and lastPollWorked. The idea is that if rx is not
132 keeping up with the incoming stream of packets (because there are threads that
133 are interfering with its running sufficiently often), rx does a polling select
134 before doing a real IOMGR_Select system call. Doing a real select means that
135 we don't have to let other processes run before processing more packets.
137 So, our algorithm is that if the last poll on the file descriptor found useful data, or
138 we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
139 then we try the polling select before the IOMGR_Select. If we eventually catch up
140 (which we can tell by the polling select returning no input packets ready), then we
141 don't do a polling select again until several seconds later (via nextPollTime mechanism).
144 static void rxi_ListenerProc(fd_set *rfds, int *tnop, struct rx_call **newcallp)
148 register struct rx_packet *p = (struct rx_packet *)0;
151 afs_int32 nextPollTime; /* time to next poll FD before sleeping */
152 int lastPollWorked, doingPoll; /* true iff last poll was useful */
153 struct timeval tv, *tvp;
159 char name[MAXTHREADNAMELENGTH] = "srv_0";
164 code = LWP_CurrentProcess(&pid);
166 fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
169 rx_listenerPid = pid;
171 (*swapNameProgram)(pid, "listener", &name);
174 /* Grab a new packet only if necessary (otherwise re-use the old one) */
176 rxi_RestoreDataBufs(p);
179 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
180 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
182 /* Wait for the next event time or a packet to arrive. */
183 /* event_RaiseEvents schedules any events whose time has come and
184 then atomically computes the time to the next event, guaranteeing
185 that this is positive. If there is no next event, it returns 0 */
187 if (!rxevent_RaiseEvents(&cv)) tvp = NULL;
189 /* It's important to copy cv to tv, because the 4.3 documentation
190 for select threatens that *tv may be updated after a select, in
191 future editions of the system, to indicate how much of the time
192 period has elapsed. So we shouldn't rely on tv not being altered. */
193 tv.tv_sec = cv.sec; /* Time to next event */
194 tv.tv_usec = cv.usec;
199 *rfds = rx_selectMask;
201 if (lastPollWorked || nextPollTime < clock_Sec()) {
202 /* we're catching up, or haven't tried to for a few seconds */
204 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no matter what */
205 tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
207 code = select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
211 code = IOMGR_Select(rx_maxSocketNumber+1, rfds, 0, 0, tvp);
213 lastPollWorked = 0; /* default is that it didn't find anything */
217 LWP_DestroyProcess(pid);
223 * If it was a timer interrupt then we can assume that
224 * the time has advanced by roughly the value of the
225 * previous timeout, and that there is now at least
231 /* select or IOMGR_Select returned failure */
232 debugSelectFailure++; /* update debugging counter */
237 * IOMGR_Cancel is invoked whenever a new event is
238 * posted that is earlier than any existing events.
239 * So we re-evaluate the time, and then go back to
246 /* Packets have arrived, presumably:
247 * If it wasn't a timer interrupt, then no event should have
248 * timed out yet (well some event may have, but only just...), so
249 * we don't bother looking to see if any have timed out, but just
250 * go directly to reading the data packets
253 if (doingPoll) lastPollWorked = 1;
255 for (i=0; p && i<rfds->fd_count; i++) {
256 socket = rfds->fd_array[i];
257 if (rxi_ReadPacket(socket, p, &host, &port)) {
259 p = rxi_ReceivePacket(p, socket, host, port,
261 if (newcallp && *newcallp) {
265 if (swapNameProgram) {
266 (*swapNameProgram)(rx_listenerPid, &name, 0);
274 for (socket = rx_minSocketNumber;
275 p && socket <= rx_maxSocketNumber; socket++) {
276 if (!FD_ISSET(socket, rfds))
278 if (rxi_ReadPacket(socket, p, &host, &port)) {
279 p = rxi_ReceivePacket(p, socket, host, port,
281 if (newcallp && *newcallp) {
285 if (swapNameProgram) {
286 (*swapNameProgram)(rx_listenerPid, &name, 0);
300 /* This is the listener process request loop. The listener process loop
301 * becomes a server thread when rxi_ListenerProc returns, and stays
302 * server thread until rxi_ServerProc returns. */
303 static void rx_ListenerProc(void *dummy)
307 struct rx_call *newcall;
310 if (!(rfds = IOMGR_AllocFDSet())) {
311 osi_Panic("rx_ListenerProc: no fd_sets!\n");
317 rxi_ListenerProc(rfds, &threadID, &newcall);
318 /* assert(threadID != -1); */
319 /* assert(newcall != NULL); */
320 sock = OSI_NULLSOCKET;
321 rxi_ServerProc(threadID, newcall, &sock);
322 /* assert(sock != OSI_NULLSOCKET); */
327 /* This is the server process request loop. The server process loop
328 * becomes a listener thread when rxi_ServerProc returns, and stays
329 * listener thread until rxi_ListenerProc returns. */
330 void rx_ServerProc(void)
334 struct rx_call *newcall = NULL;
337 if (!(rfds = IOMGR_AllocFDSet())) {
338 osi_Panic("rxi_ListenerProc: no fd_sets!\n");
341 rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
342 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
343 /* threadID is used for making decisions in GetCall. Get it by bumping
344 * number of threads handling incoming calls */
345 threadID = rxi_availProcs++;
348 sock = OSI_NULLSOCKET;
349 rxi_ServerProc(threadID, newcall, &sock);
350 /* assert(sock != OSI_NULLSOCKET); */
352 rxi_ListenerProc(rfds, &threadID, &newcall);
353 /* assert(threadID != -1); */
354 /* assert(newcall != NULL); */
360 * Called from GetUDPSocket.
361 * Called from a single thread at startup.
362 * Returns 0 on success; -1 on failure.
364 int rxi_Listen(osi_socket sock)
368 * Put the socket into non-blocking mode so that rx_Listener
369 * can do a polling read before entering select
371 #ifndef AFS_DJGPP_ENV
372 if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
374 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
378 if ( __djgpp_set_socket_blocking_mode(sock, 1) < 0 ) {
379 perror("__djgpp_set_socket_blocking_mode");
380 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
383 #endif /* AFS_DJGPP_ENV */
385 if (sock > FD_SETSIZE-1) {
386 (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
392 FD_SET(sock, &rx_selectMask);
393 if (sock > rx_maxSocketNumber) rx_maxSocketNumber = sock;
394 if (sock < rx_minSocketNumber) rx_minSocketNumber = sock;
401 int rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
403 return recvmsg((int) socket, msg_p, flags);
407 * Simulate a blocking sendmsg on the non-blocking socket.
408 * It's non blocking because it was set that way for recvmsg.
410 int rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
412 fd_set *sfds = (fd_set*)0;
413 while (sendmsg(socket, msg_p, flags) == -1) {
415 rx_stats.sendSelects++;
418 if ( !(sfds = IOMGR_AllocFDSet())) {
419 (osi_Msg "rx failed to alloc fd_set: ");
420 perror("rx_sendmsg");
423 FD_SET(socket, sfds);
428 #elif defined(AFS_LINUX22_ENV)
429 /* linux unfortunately returns ECONNREFUSED if the target port
430 * is no longer in use */
431 /* and EAGAIN if a UDP checksum is incorrect */
432 if (errno != EWOULDBLOCK && errno != ENOBUFS &&
433 errno != ECONNREFUSED && errno != EAGAIN)
435 if (errno != EWOULDBLOCK && errno != ENOBUFS)
438 (osi_Msg "rx failed to send packet: ");
439 perror("rx_sendmsg");
442 while ((err = select(socket+1, 0, sfds, 0, 0)) != 1) {
443 if (err >= 0 || errno != EINTR)
444 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
448 IOMGR_FreeFDSet(sfds);