Build fix - pre-processor typos in rx_lwp.c and rx_pthread.c
[openafs.git] / src / rx / rx_lwp.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
11
12 /* This controls the size of an fd_set; it must be defined early before
13  * the system headers define that type and the macros that operate on it.
14  * Its value should be as large as the maximum file descriptor limit we
15  * are likely to run into on any platform.  Right now, that is 65536
16  * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24
25 # include <sys/types.h>         /* fd_set on older platforms */
26 # include <errno.h>
27 # include <signal.h>
28 #ifdef AFS_NT40_ENV
29 # include <winsock2.h>
30 #else
31 # include <unistd.h>            /* select() prototype */
32 # include <sys/time.h>          /* struct timeval, select() prototype */
33 # ifndef FD_SET
34 #  include <sys/select.h>       /* fd_set on newer platforms */
35 # endif
36 # include <sys/socket.h>
37 # include <sys/file.h>
38 # include <netdb.h>
39 # include <sys/stat.h>
40 # include <netinet/in.h>
41 # include <net/if.h>
42 # include <sys/ioctl.h>
43 # include <sys/time.h>
44 #endif
45 # include "rx.h"
46 # include "rx_globals.h"
47 # include <lwp.h>
48
49 #define MAXTHREADNAMELENGTH 64
50
51 int debugSelectFailure;         /* # of times select failed */
52
53 /*
54  * Sleep on the unique wait channel provided.
55  */
56 void
57 rxi_Sleep(void *addr)
58 {
59     LWP_WaitProcess(addr);
60 }
61
62 /*
63  * Wakeup any threads on the channel provided.
64  * They may be woken up spuriously, and must check any conditions.
65  */
66 void
67 rxi_Wakeup(void *addr)
68 {
69     LWP_NoYieldSignal(addr);
70 }
71
72 PROCESS rx_listenerPid = 0;     /* LWP process id of socket listener process */
73 static void* rx_ListenerProc(void *dummy);
74
75 /*
76  * Delay the current thread the specified number of seconds.
77  */
78 void
79 rxi_Delay(int sec)
80 {
81     IOMGR_Sleep(sec);
82 }
83
84 static int quitListening = 0;
85
86 /* This routine will kill the listener thread, if it exists. */
87 void
88 rxi_StopListener(void)
89 {
90     quitListening = 1;
91     rxi_ReScheduleEvents();
92 }
93
94 /* This routine will get called by the event package whenever a new,
95    earlier than others, event is posted.  If the Listener process
96    is blocked in selects, this will unblock it.  It also can be called
97    to force a new trip through the rxi_Listener select loop when the set
98    of file descriptors it should be listening to changes... */
99 void
100 rxi_ReScheduleEvents(void)
101 {
102     if (rx_listenerPid)
103         IOMGR_Cancel(rx_listenerPid);
104 }
105
106 void
107 rxi_InitializeThreadSupport(void)
108 {
109     PROCESS junk;
110
111     LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
112     IOMGR_Initialize();
113     FD_ZERO(&rx_selectMask);
114 }
115
116 void
117 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
118 {
119     PROCESS scratchPid;
120     static int number = 0;
121     char name[32];
122
123     sprintf(name, "srv_%d", ++number);
124     LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
125                       "rx_ServerProc", &scratchPid);
126     if (registerProgram)
127         (*registerProgram) (scratchPid, name);
128 }
129
130 void
131 rxi_StartListener(void)
132 {
133     /* Priority of listener should be high, so it can keep conns alive */
134 #define RX_LIST_STACK   24000
135     LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
136                       NULL, "rx_Listener", &rx_listenerPid);
137     if (registerProgram)
138         (*registerProgram) (rx_listenerPid, "listener");
139 }
140
141 /* The main loop which listens to the net for datagrams, and handles timeouts
142    and retransmissions, etc.  It also is responsible for scheduling the
143    execution of pending events (in conjunction with event.c).
144
145    Note interaction of nextPollTime and lastPollWorked.  The idea is that if rx is not
146    keeping up with the incoming stream of packets (because there are threads that
147    are interfering with its running sufficiently often), rx does a polling select
148    before doing a real IOMGR_Select system call.  Doing a real select means that
149    we don't have to let other processes run before processing more packets.
150
151    So, our algorithm is that if the last poll on the file descriptor found useful data, or
152    we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
153    then we try the polling select before the IOMGR_Select.  If we eventually catch up
154    (which we can tell by the polling select returning no input packets ready), then we
155    don't do a polling select again until several seconds later (via nextPollTime mechanism).
156    */
157
158 static void
159 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
160 {
161     afs_uint32 host;
162     u_short port;
163     struct rx_packet *p = (struct rx_packet *)0;
164     osi_socket socket;
165     struct clock cv;
166     afs_int32 nextPollTime;     /* time to next poll FD before sleeping */
167     int lastPollWorked, doingPoll;      /* true iff last poll was useful */
168     struct timeval tv, *tvp;
169     int code;
170 #ifdef AFS_NT40_ENV
171     int i;
172 #endif
173     PROCESS pid;
174     char name[MAXTHREADNAMELENGTH] = "srv_0";
175
176     clock_NewTime();
177     lastPollWorked = 0;
178     nextPollTime = 0;
179     code = LWP_CurrentProcess(&pid);
180     if (code) {
181         fprintf(stderr, "rxi_Listener: Can't get my pid.\n");
182         exit(1);
183     }
184     rx_listenerPid = pid;
185     if (swapNameProgram)
186         (*swapNameProgram) (pid, "listener", &name[0]);
187
188     for (;;) {
189         /* Grab a new packet only if necessary (otherwise re-use the old one) */
190         if (p) {
191             rxi_RestoreDataBufs(p);
192         } else {
193             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
194                 osi_Panic("rxi_ListenerProc: no packets!");     /* Shouldn't happen */
195         }
196         /* Wait for the next event time or a packet to arrive. */
197         /* event_RaiseEvents schedules any events whose time has come and
198          * then atomically computes the time to the next event, guaranteeing
199          * that this is positive.  If there is no next event, it returns 0 */
200         clock_NewTime();
201         if (!rxevent_RaiseEvents(&cv))
202             tvp = NULL;
203         else {
204             /* It's important to copy cv to tv, because the 4.3 documentation
205              * for select threatens that *tv may be updated after a select, in
206              * future editions of the system, to indicate how much of the time
207              * period has elapsed.  So we shouldn't rely on tv not being altered. */
208             tv.tv_sec = cv.sec; /* Time to next event */
209             tv.tv_usec = cv.usec;
210             tvp = &tv;
211         }
212         rx_stats.selects++;
213
214         *rfds = rx_selectMask;
215
216         if (lastPollWorked || nextPollTime < clock_Sec()) {
217             /* we're catching up, or haven't tried to for a few seconds */
218             doingPoll = 1;
219             nextPollTime = clock_Sec() + 4;     /* try again in 4 seconds no matter what */
220             tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
221             tvp = &tv;
222             code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
223         } else {
224             doingPoll = 0;
225             code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
226         }
227         lastPollWorked = 0;     /* default is that it didn't find anything */
228
229         if (quitListening) {
230             quitListening = 0;
231             LWP_DestroyProcess(pid);
232         }
233
234         switch (code) {
235         case 0:
236             /* Timer interrupt:
237              * If it was a timer interrupt then we can assume that
238              * the time has advanced by roughly the value of the
239              * previous timeout, and that there is now at least
240              * one pending event.
241              */
242             clock_NewTime();
243             break;
244         case -1:
245             /* select or IOMGR_Select returned failure */
246             debugSelectFailure++;       /* update debugging counter */
247             clock_NewTime();
248             break;
249         case -2:
250             /* IOMGR_Cancel:
251              * IOMGR_Cancel is invoked whenever a new event is
252              * posted that is earlier than any existing events.
253              * So we re-evaluate the time, and then go back to
254              * reschedule events
255              */
256             clock_NewTime();
257             break;
258
259         default:
260             /* Packets have arrived, presumably:
261              * If it wasn't a timer interrupt, then no event should have
262              * timed out yet (well some event may have, but only just...), so
263              * we don't bother looking to see if any have timed out, but just
264              * go directly to reading the data packets
265              */
266             clock_NewTime();
267             if (doingPoll)
268                 lastPollWorked = 1;
269 #ifdef AFS_NT40_ENV
270             for (i = 0; p && i < rfds->fd_count; i++) {
271                 socket = rfds->fd_array[i];
272                 if (rxi_ReadPacket(socket, p, &host, &port)) {
273                     *newcallp = NULL;
274                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
275                                           newcallp);
276                     if (newcallp && *newcallp) {
277                         if (p) {
278                             rxi_FreePacket(p);
279                         }
280                         if (swapNameProgram) {
281                             (*swapNameProgram) (rx_listenerPid, name, 0);
282                             rx_listenerPid = 0;
283                         }
284                         return;
285                     }
286                 }
287             }
288 #else
289             for (socket = rx_minSocketNumber;
290                  p && socket <= rx_maxSocketNumber; socket++) {
291                 if (!FD_ISSET(socket, rfds))
292                     continue;
293                 if (rxi_ReadPacket(socket, p, &host, &port)) {
294                     p = rxi_ReceivePacket(p, socket, host, port, tnop,
295                                           newcallp);
296                     if (newcallp && *newcallp) {
297                         if (p) {
298                             rxi_FreePacket(p);
299                         }
300                         if (swapNameProgram) {
301                             (*swapNameProgram) (rx_listenerPid, name, 0);
302                             rx_listenerPid = 0;
303                         }
304                         return;
305                     }
306                 }
307             }
308 #endif
309             break;
310         }
311     }
312     /* NOTREACHED */
313 }
314
315 /* This is the listener process request loop. The listener process loop
316  * becomes a server thread when rxi_ListenerProc returns, and stays
317  * server thread until rxi_ServerProc returns. */
318 static void *
319 rx_ListenerProc(void *dummy)
320 {
321     int threadID;
322     osi_socket sock;
323     struct rx_call *newcall;
324     fd_set *rfds;
325
326     if (!(rfds = IOMGR_AllocFDSet())) {
327         osi_Panic("rx_ListenerProc: no fd_sets!\n");
328     }
329
330     while (1) {
331         newcall = NULL;
332         threadID = -1;
333         rxi_ListenerProc(rfds, &threadID, &newcall);
334         /* assert(threadID != -1); */
335         /* assert(newcall != NULL); */
336         sock = OSI_NULLSOCKET;
337         rxi_ServerProc(threadID, newcall, &sock);
338         /* assert(sock != OSI_NULLSOCKET); */
339     }
340     /* not reached */
341     return NULL;
342 }
343
344 /* This is the server process request loop. The server process loop
345  * becomes a listener thread when rxi_ServerProc returns, and stays
346  * listener thread until rxi_ListenerProc returns. */
347 void *
348 rx_ServerProc(void * unused)
349 {
350     osi_socket sock;
351     int threadID;
352     struct rx_call *newcall = NULL;
353     fd_set *rfds;
354
355     if (!(rfds = IOMGR_AllocFDSet())) {
356         osi_Panic("rxi_ListenerProc: no fd_sets!\n");
357     }
358
359     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
360     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
361     /* threadID is used for making decisions in GetCall.  Get it by bumping
362      * number of threads handling incoming calls */
363     threadID = rxi_availProcs++;
364
365     while (1) {
366         sock = OSI_NULLSOCKET;
367         rxi_ServerProc(threadID, newcall, &sock);
368         /* assert(sock != OSI_NULLSOCKET); */
369         newcall = NULL;
370         rxi_ListenerProc(rfds, &threadID, &newcall);
371         /* assert(threadID != -1); */
372         /* assert(newcall != NULL); */
373     }
374     /* not reached */
375     return NULL;
376 }
377
378 /*
379  * Called from GetUDPSocket.
380  * Called from a single thread at startup.
381  * Returns 0 on success; -1 on failure.
382  */
383 int
384 rxi_Listen(osi_socket sock)
385 {
386 #ifndef AFS_NT40_ENV
387     /*
388      * Put the socket into non-blocking mode so that rx_Listener
389      * can do a polling read before entering select
390      */
391     if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
392         perror("fcntl");
393         (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
394         return -1;
395     }
396
397     if (sock > FD_SETSIZE - 1) {
398         (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
399          FD_SETSIZE - 1);
400         return -1;
401     }
402 #endif
403
404     FD_SET(sock, &rx_selectMask);
405     if (sock > rx_maxSocketNumber)
406         rx_maxSocketNumber = sock;
407     if (sock < rx_minSocketNumber)
408         rx_minSocketNumber = sock;
409     return 0;
410 }
411
412 /*
413  * Recvmsg
414  */
415 int
416 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
417 {
418 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
419     while((rxi_HandleSocketError(socket)) > 0)
420         ;
421 #endif
422     return recvmsg(socket, msg_p, flags);
423 }
424
425 /*
426  * Simulate a blocking sendmsg on the non-blocking socket.
427  * It's non blocking because it was set that way for recvmsg.
428  */
429 int
430 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
431 {
432     fd_set *sfds = (fd_set *) 0;
433     while (sendmsg(socket, msg_p, flags) == -1) {
434         int err;
435         rx_stats.sendSelects++;
436
437         if (!sfds) {
438             if (!(sfds = IOMGR_AllocFDSet())) {
439                 (osi_Msg "rx failed to alloc fd_set: ");
440                 perror("rx_sendmsg");
441                 return -1;
442             }
443             FD_SET(socket, sfds);
444         }
445 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
446         while((rxi_HandleSocketError(socket)) > 0)
447           ;
448 #endif
449 #ifdef AFS_NT40_ENV
450         if (WSAGetLastError())
451 #elif defined(AFS_LINUX22_ENV)
452         /* linux unfortunately returns ECONNREFUSED if the target port
453          * is no longer in use */
454         /* and EAGAIN if a UDP checksum is incorrect */
455         if (errno != EWOULDBLOCK && errno != ENOBUFS && errno != ECONNREFUSED
456             && errno != EAGAIN)
457 #else
458         if (errno != EWOULDBLOCK && errno != ENOBUFS)
459 #endif
460         {
461             (osi_Msg "rx failed to send packet: ");
462             perror("rx_sendmsg");
463 #ifndef AFS_NT40_ENV
464             if (errno > 0)
465               return -errno;
466 #else
467             if (WSAGetLastError() > 0)
468               return -WSAGetLastError();
469 #endif
470             return -1;
471         }
472         while ((err = select(socket + 1, 0, sfds, 0, 0)) != 1) {
473             if (err >= 0 || errno != EINTR)
474                 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
475         }
476     }
477     if (sfds)
478         IOMGR_FreeFDSet(sfds);
479     return 0;
480 }