3139d8376646488bc42beb584dcbad5de2bdf612
[openafs.git] / src / rx / UKERNEL / rx_knet.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include "afs/param.h"
12
13
14 #include "rx/rx_kcommon.h"
15
16
17 #define SECONDS_TO_SLEEP        0
18 #define NANO_SECONDS_TO_SLEEP   100000000       /* 100 milliseconds */
19 #define LOOPS_PER_WAITCHECK     10      /* once per second */
20
21 unsigned short usr_rx_port = 0;
22
23 struct usr_ifnet *usr_ifnet = NULL;
24 struct usr_in_ifaddr *usr_in_ifaddr = NULL;
25
26 void rxk_InitializeSocket();
27
28 void
29 afs_rxevent_daemon(void)
30 {
31     struct timespec tv;
32     struct clock temp;
33     int i = 0;
34
35     AFS_GUNLOCK();
36     while (1) {
37         tv.tv_sec = SECONDS_TO_SLEEP;
38         tv.tv_nsec = NANO_SECONDS_TO_SLEEP;
39         usr_thread_sleep(&tv);
40         /*
41          * Check for shutdown, don't try to stop the listener
42          */
43         if (afs_termState == AFSOP_STOP_RXEVENT
44             || afs_termState == AFSOP_STOP_RXK_LISTENER) {
45             AFS_GLOCK();
46             afs_termState = AFSOP_STOP_COMPLETE;
47             afs_osi_Wakeup(&afs_termState);
48             return;
49         }
50         rxevent_RaiseEvents(&temp);
51         if (++i >= LOOPS_PER_WAITCHECK) {
52             i = 0;
53             afs_osi_CheckTimedWaits();
54         }
55     }
56 }
57
58
59 /* Loop to listen on a socket. Return setting *newcallp if this
60  * thread should become a server thread.  */
61 void
62 rxi_ListenerProc(osi_socket usockp, int *tnop, struct rx_call **newcallp)
63 {
64     struct rx_packet *tp;
65     afs_uint32 host;
66     u_short port;
67     int rc;
68
69     /*
70      * Use the rxk_GetPacketProc and rxk_PacketArrivalProc routines
71      * to allocate rx_packet buffers and pass them to the RX layer
72      * for processing.
73      */
74     while (1) {
75         tp = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE);
76         usr_assert(tp != NULL);
77         rc = rxi_ReadPacket(usockp, tp, &host, &port);
78         if (rc != 0) {
79             tp = rxi_ReceivePacket(tp, usockp, host, port, tnop, newcallp);
80             if (newcallp && *newcallp) {
81                 if (tp) {
82                     rxi_FreePacket(tp);
83                 }
84                 return;
85             }
86         }
87         if (tp) {
88             rxi_FreePacket(tp);
89         }
90         if (afs_termState == AFSOP_STOP_RXEVENT) {
91             afs_termState = AFSOP_STOP_RXK_LISTENER;
92             afs_osi_Wakeup(&afs_termState);
93         }
94     }
95 }
96
97 /* This is the listener process request loop. The listener process loop
98  * becomes a server thread when rxi_ListenerProc returns, and stays
99  * server thread until rxi_ServerProc returns. */
100 void
101 rxk_Listener(void)
102 {
103     int threadID;
104     osi_socket sock = (osi_socket) rx_socket;
105     struct rx_call *newcall;
106     struct usr_socket *usockp;
107
108     /*
109      * Initialize the rx_socket and start the receiver threads
110      */
111     rxk_InitializeSocket();
112
113     usockp = (struct usr_socket *)rx_socket;
114     assert(usockp != NULL);
115
116     AFS_GUNLOCK();
117     while (1) {
118         newcall = NULL;
119         threadID = -1;
120         rxi_ListenerProc(sock, &threadID, &newcall);
121         /* assert(threadID != -1); */
122         /* assert(newcall != NULL); */
123         sock = OSI_NULLSOCKET;
124         rxi_ServerProc(threadID, newcall, &sock);
125         if (sock == OSI_NULLSOCKET) {
126             break;
127         }
128     }
129     AFS_GLOCK();
130 }
131
132 /* This is the server process request loop. The server process loop
133  * becomes a listener thread when rxi_ServerProc returns, and stays
134  * listener thread until rxi_ListenerProc returns. */
135 void *
136 rx_ServerProc(void *unused)
137 {
138     osi_socket sock;
139     int threadID;
140     struct rx_call *newcall = NULL;
141
142     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
143     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
144     /* threadID is used for making decisions in GetCall.  Get it by bumping
145      * number of threads handling incoming calls */
146     threadID = rxi_availProcs++;
147
148     AFS_GUNLOCK();
149     while (1) {
150         sock = OSI_NULLSOCKET;
151         rxi_ServerProc(threadID, newcall, &sock);
152         if (sock == OSI_NULLSOCKET) {
153             break;
154         }
155         newcall = NULL;
156         threadID = -1;
157         rxi_ListenerProc(sock, &threadID, &newcall);
158         /* assert(threadID != -1); */
159         /* assert(newcall != NULL); */
160     }
161     AFS_GLOCK();
162     return NULL;
163 }
164
165 /*
166  * At this point, RX wants a socket, but still has not initialized the
167  * rx_port variable or the pointers to the packet allocater and arrival
168  * routines. Allocate the socket buffer here, but don't open it until
169  * we start the receiver threads.
170  */
171 osi_socket *
172 rxk_NewSocketHost(afs_uint32 ahost, short aport)
173 {
174     struct usr_socket *usockp;
175
176     usockp = (struct usr_socket *)afs_osi_Alloc(sizeof(struct usr_socket));
177     usr_assert(usockp != NULL);
178
179     usockp->sock = -1;
180
181     return (osi_socket *)usockp;
182 }
183
184 osi_socket *
185 rxk_NewSocket(short aport)
186 {
187     return rxk_NewSocketHost(htonl(INADDR_ANY), aport);
188 }
189
190 /*
191  * This routine is called from rxk_Listener. By this time rx_port
192  * is set to 7001 and rx_socket points to the socket buffer
193  * we allocated in rxk_NewSocket. Now is the time to bind our
194  * socket and start the receiver threads.
195  */
196 void
197 rxk_InitializeSocket(void)
198 {
199     int rc, sock, i;
200 #ifdef AFS_USR_AIX_ENV
201     unsigned long len, optval, optval0, optlen;
202 #else /* AFS_USR_AIX_ENV */
203     int len, optval, optval0, optlen;
204 #endif /* AFS_USR_AIX_ENV */
205     struct usr_socket *usockp;
206     struct sockaddr_in lcladdr;
207
208     usr_assert(rx_socket != NULL);
209     usockp = (struct usr_socket *)rx_socket;
210
211 #undef socket
212     sock = socket(PF_INET, SOCK_DGRAM, 0);
213     usr_assert(sock >= 0);
214
215     memset((void *)&lcladdr, 0, sizeof(struct sockaddr_in));
216     lcladdr.sin_family = AF_INET;
217     lcladdr.sin_port = htons(usr_rx_port);
218     lcladdr.sin_addr.s_addr = INADDR_ANY;
219     rc = bind(sock, (struct sockaddr *)&lcladdr, sizeof(struct sockaddr_in));
220     usr_assert(rc >= 0);
221     len = sizeof(struct sockaddr_in);
222     rc = getsockname(sock, (struct sockaddr *)&lcladdr, &len);
223     usr_assert(rc >= 0);
224 #ifdef AFS_USR_LINUX22_ENV
225     optval0 = 131070;
226 #else
227     optval0 = 131072;
228 #endif
229     optval = optval0;
230     rc = setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval,
231                     sizeof(optval));
232     usr_assert(rc == 0);
233     optlen = sizeof(optval);
234     rc = getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval, &optlen);
235     usr_assert(rc == 0);
236     /* usr_assert(optval == optval0); */
237 #ifdef AFS_USR_LINUX22_ENV
238     optval0 = 131070;
239 #else
240     optval0 = 131072;
241 #endif
242     optval = optval0;
243     rc = setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval,
244                     sizeof(optval));
245     usr_assert(rc == 0);
246     optlen = sizeof(optval);
247     rc = getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval, &optlen);
248     usr_assert(rc == 0);
249     /* usr_assert(optval == optval0); */
250
251 #ifdef AFS_USR_AIX_ENV
252     optval = 1;
253     rc = setsockopt(sock, SOL_SOCKET, SO_CKSUMRECV, (void *)&optval,
254                     sizeof(optval));
255     usr_assert(rc == 0);
256 #endif /* AFS_USR_AIX_ENV */
257
258     usockp->sock = sock;
259     usockp->port = lcladdr.sin_port;
260
261     /*
262      * Set the value of rx_port to reflect the address we actually
263      * are listening on, since the kernel is probably already using 7001.
264      */
265     rx_port = usockp->port;
266 }
267
268 int
269 rxk_FreeSocket(struct usr_socket *sockp)
270 {
271     return 0;
272 }
273
274 void
275 osi_StopListener(void)
276 {
277     rxk_FreeSocket((struct usr_socket *)rx_socket);
278 }
279
280 int
281 osi_NetSend(osi_socket sockp, struct sockaddr_in *addr, struct iovec *iov,
282             int nio, afs_int32 size, int stack)
283 {
284     int rc;
285     int i;
286     unsigned long tmp;
287     struct usr_socket *usockp = (struct usr_socket *)sockp;
288     struct msghdr msg;
289     struct iovec tmpiov[64];
290
291     /*
292      * The header is in the first iovec
293      */
294     usr_assert(nio > 0 && nio <= 64);
295     for (i = 0; i < nio; i++) {
296         tmpiov[i].iov_base = iov[i].iov_base;
297         tmpiov[i].iov_len = iov[i].iov_len;
298     }
299
300     memset(&msg, 0, sizeof(msg));
301     msg.msg_name = (void *)addr;
302     msg.msg_namelen = sizeof(struct sockaddr_in);
303     msg.msg_iov = &tmpiov[0];
304     msg.msg_iovlen = nio;
305
306     rc = sendmsg(usockp->sock, &msg, 0);
307     if (rc < 0) {
308         return errno;
309     }
310     usr_assert(rc == size);
311
312     return 0;
313 }
314
315 void
316 shutdown_rxkernel(void)
317 {
318     rxk_initDone = 0;
319     rxk_shutdownPorts();
320 }
321
322 void
323 rx_Finalize(void)
324 {
325     usr_assert(0);
326 }
327
328 /*
329  * Recvmsg.
330  *
331  */
332 int
333 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
334 {
335     int ret;
336     do {
337         ret = recvmsg(socket->sock, msg_p, flags);
338     } while (ret == -1 && errno == EAGAIN);
339     return ret;
340 }