ukernel: add morepackets check in listener
[openafs.git] / src / rx / UKERNEL / rx_knet.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include "afs/param.h"
12
13
14 #include "rx/rx_kcommon.h"
15
16
17 #define SECONDS_TO_SLEEP        0
18 #define NANO_SECONDS_TO_SLEEP   100000000       /* 100 milliseconds */
19 #define LOOPS_PER_WAITCHECK     10      /* once per second */
20
21 unsigned short usr_rx_port = 0;
22
23 struct usr_ifnet *usr_ifnet = NULL;
24 struct usr_in_ifaddr *usr_in_ifaddr = NULL;
25
26 void rxk_InitializeSocket(void);
27 extern int afs_osi_CheckTimedWaits(void);
28
29 void
30 afs_rxevent_daemon(void)
31 {
32     struct timespec tv;
33     struct clock temp;
34     int i = 0;
35
36     AFS_GUNLOCK();
37     while (1) {
38         tv.tv_sec = SECONDS_TO_SLEEP;
39         tv.tv_nsec = NANO_SECONDS_TO_SLEEP;
40         usr_thread_sleep(&tv);
41         /*
42          * Check for shutdown, don't try to stop the listener
43          */
44         if (afs_termState == AFSOP_STOP_RXEVENT
45             || afs_termState == AFSOP_STOP_RXK_LISTENER) {
46             AFS_GLOCK();
47             afs_termState = AFSOP_STOP_COMPLETE;
48             afs_osi_Wakeup(&afs_termState);
49             return;
50         }
51         rxevent_RaiseEvents(&temp);
52         if (++i >= LOOPS_PER_WAITCHECK) {
53             i = 0;
54             afs_osi_CheckTimedWaits();
55         }
56     }
57 }
58
59
60 /* Loop to listen on a socket. Return setting *newcallp if this
61  * thread should become a server thread.  */
62 void
63 rxi_ListenerProc(osi_socket usockp, int *tnop, struct rx_call **newcallp)
64 {
65     struct rx_packet *tp;
66     afs_uint32 host;
67     u_short port;
68     int rc;
69
70     /*
71      * Use the rxk_GetPacketProc and rxk_PacketArrivalProc routines
72      * to allocate rx_packet buffers and pass them to the RX layer
73      * for processing.
74      */
75     while (1) {
76         /* See if a check for additional packets was issued */
77         rx_CheckPackets();
78
79         tp = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE);
80         usr_assert(tp != NULL);
81         rc = rxi_ReadPacket(usockp, tp, &host, &port);
82         if (rc != 0) {
83             tp = rxi_ReceivePacket(tp, usockp, host, port, tnop, newcallp);
84             if (newcallp && *newcallp) {
85                 if (tp) {
86                     rxi_FreePacket(tp);
87                 }
88                 return;
89             }
90         }
91         if (tp) {
92             rxi_FreePacket(tp);
93         }
94         if (afs_termState == AFSOP_STOP_RXEVENT) {
95             afs_termState = AFSOP_STOP_RXK_LISTENER;
96             afs_osi_Wakeup(&afs_termState);
97         }
98     }
99 }
100
101 /* This is the listener process request loop. The listener process loop
102  * becomes a server thread when rxi_ListenerProc returns, and stays
103  * server thread until rxi_ServerProc returns. */
104 void
105 rxk_Listener(void)
106 {
107     int threadID;
108     osi_socket sock = (osi_socket) rx_socket;
109     struct rx_call *newcall;
110     struct usr_socket *usockp;
111
112     /*
113      * Initialize the rx_socket and start the receiver threads
114      */
115     rxk_InitializeSocket();
116
117     usockp = (struct usr_socket *)rx_socket;
118     assert(usockp != NULL);
119
120     AFS_GUNLOCK();
121     while (1) {
122         newcall = NULL;
123         threadID = -1;
124         rxi_ListenerProc(sock, &threadID, &newcall);
125         /* assert(threadID != -1); */
126         /* assert(newcall != NULL); */
127         sock = OSI_NULLSOCKET;
128         rxi_ServerProc(threadID, newcall, &sock);
129         if (sock == OSI_NULLSOCKET) {
130             break;
131         }
132     }
133     AFS_GLOCK();
134 }
135
136 /* This is the server process request loop. The server process loop
137  * becomes a listener thread when rxi_ServerProc returns, and stays
138  * listener thread until rxi_ListenerProc returns. */
139 void *
140 rx_ServerProc(void *unused)
141 {
142     osi_socket sock;
143     int threadID;
144     struct rx_call *newcall = NULL;
145
146     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
147     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
148     /* threadID is used for making decisions in GetCall.  Get it by bumping
149      * number of threads handling incoming calls */
150     threadID = rxi_availProcs++;
151
152     AFS_GUNLOCK();
153     while (1) {
154         sock = OSI_NULLSOCKET;
155         rxi_ServerProc(threadID, newcall, &sock);
156         if (sock == OSI_NULLSOCKET) {
157             break;
158         }
159         newcall = NULL;
160         threadID = -1;
161         rxi_ListenerProc(sock, &threadID, &newcall);
162         /* assert(threadID != -1); */
163         /* assert(newcall != NULL); */
164     }
165     AFS_GLOCK();
166     return NULL;
167 }
168
169 /*
170  * At this point, RX wants a socket, but still has not initialized the
171  * rx_port variable or the pointers to the packet allocater and arrival
172  * routines. Allocate the socket buffer here, but don't open it until
173  * we start the receiver threads.
174  */
175 osi_socket *
176 rxk_NewSocketHost(afs_uint32 ahost, short aport)
177 {
178     struct usr_socket *usockp;
179
180     usockp = (struct usr_socket *)afs_osi_Alloc(sizeof(struct usr_socket));
181     usr_assert(usockp != NULL);
182
183     usockp->sock = -1;
184
185     return (osi_socket *)usockp;
186 }
187
188 osi_socket *
189 rxk_NewSocket(short aport)
190 {
191     return rxk_NewSocketHost(htonl(INADDR_ANY), aport);
192 }
193
194 /*
195  * This routine is called from rxk_Listener. By this time rx_port
196  * is set to 7001 and rx_socket points to the socket buffer
197  * we allocated in rxk_NewSocket. Now is the time to bind our
198  * socket and start the receiver threads.
199  */
200 void
201 rxk_InitializeSocket(void)
202 {
203     int rc, sock;
204 #ifdef AFS_USR_AIX_ENV
205     unsigned long len, optval, optval0, optlen;
206 #else /* AFS_USR_AIX_ENV */
207     socklen_t len, optlen;
208     int optval, optval0;
209 #endif /* AFS_USR_AIX_ENV */
210     struct usr_socket *usockp;
211     struct sockaddr_in lcladdr;
212
213     usr_assert(rx_socket != NULL);
214     usockp = (struct usr_socket *)rx_socket;
215
216 #undef socket
217     sock = socket(PF_INET, SOCK_DGRAM, 0);
218     usr_assert(sock >= 0);
219
220     memset((void *)&lcladdr, 0, sizeof(struct sockaddr_in));
221     lcladdr.sin_family = AF_INET;
222     lcladdr.sin_port = htons(usr_rx_port);
223     lcladdr.sin_addr.s_addr = INADDR_ANY;
224     rc = bind(sock, (struct sockaddr *)&lcladdr, sizeof(struct sockaddr_in));
225     usr_assert(rc >= 0);
226     len = sizeof(struct sockaddr_in);
227     rc = getsockname(sock, (struct sockaddr *)&lcladdr, &len);
228     usr_assert(rc >= 0);
229 #ifdef AFS_USR_LINUX22_ENV
230     optval0 = 131070;
231 #else
232     optval0 = 131072;
233 #endif
234     optval = optval0;
235     rc = setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval,
236                     sizeof(optval));
237     usr_assert(rc == 0);
238     optlen = sizeof(optval);
239     rc = getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval, &optlen);
240     usr_assert(rc == 0);
241     /* usr_assert(optval == optval0); */
242 #ifdef AFS_USR_LINUX22_ENV
243     optval0 = 131070;
244 #else
245     optval0 = 131072;
246 #endif
247     optval = optval0;
248     rc = setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval,
249                     sizeof(optval));
250     usr_assert(rc == 0);
251     optlen = sizeof(optval);
252     rc = getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval, &optlen);
253     usr_assert(rc == 0);
254     /* usr_assert(optval == optval0); */
255
256 #ifdef AFS_USR_AIX_ENV
257     optval = 1;
258     rc = setsockopt(sock, SOL_SOCKET, SO_CKSUMRECV, (void *)&optval,
259                     sizeof(optval));
260     usr_assert(rc == 0);
261 #endif /* AFS_USR_AIX_ENV */
262
263 #ifdef FD_CLOEXEC
264     fcntl(sock, F_SETFD, FD_CLOEXEC);
265 #endif
266
267     usockp->sock = sock;
268     usockp->port = lcladdr.sin_port;
269
270     /*
271      * Set the value of rx_port to reflect the address we actually
272      * are listening on, since the kernel is probably already using 7001.
273      */
274     rx_port = usockp->port;
275 }
276
277 int
278 rxk_FreeSocket(struct usr_socket *sockp)
279 {
280     return 0;
281 }
282
283 void
284 osi_StopListener(void)
285 {
286     rxk_FreeSocket((struct usr_socket *)rx_socket);
287 }
288
289 int
290 osi_NetSend(osi_socket sockp, struct sockaddr_in *addr, struct iovec *iov,
291             int nio, afs_int32 size, int stack)
292 {
293     int rc;
294     int i;
295     struct usr_socket *usockp = (struct usr_socket *)sockp;
296     struct msghdr msg;
297     struct iovec tmpiov[64];
298
299     /*
300      * The header is in the first iovec
301      */
302     usr_assert(nio > 0 && nio <= 64);
303     for (i = 0; i < nio; i++) {
304         tmpiov[i].iov_base = iov[i].iov_base;
305         tmpiov[i].iov_len = iov[i].iov_len;
306     }
307
308     memset(&msg, 0, sizeof(msg));
309     msg.msg_name = (void *)addr;
310     msg.msg_namelen = sizeof(struct sockaddr_in);
311     msg.msg_iov = &tmpiov[0];
312     msg.msg_iovlen = nio;
313
314     rc = sendmsg(usockp->sock, &msg, 0);
315     if (rc < 0) {
316         return errno;
317     }
318     usr_assert(rc == size);
319
320     return 0;
321 }
322
323 void
324 shutdown_rxkernel(void)
325 {
326     rxk_initDone = 0;
327     rxk_shutdownPorts();
328 }
329
330 void
331 rx_Finalize(void)
332 {
333     usr_assert(0);
334 }
335
336 /*
337  * Recvmsg.
338  *
339  */
340 int
341 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
342 {
343     int ret;
344     do {
345         ret = recvmsg(socket->sock, msg_p, flags);
346     } while (ret == -1 && errno == EAGAIN);
347     return ret;
348 }