netsend-proto-and-obsd-includes-20021010
[openafs.git] / src / rx / UKERNEL / rx_knet.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include "../afs/param.h"
12
13 RCSID("$Header$");
14
15 #include "../rx/rx_kcommon.h"
16
17
18 #define SECONDS_TO_SLEEP        0
19 #define NANO_SECONDS_TO_SLEEP   100000000 /* 100 milliseconds */
20 #define LOOPS_PER_WAITCHECK     10        /* once per second */
21
22 unsigned short usr_rx_port = 0;
23
24 struct usr_ifnet *usr_ifnet = NULL;
25 struct usr_in_ifaddr *usr_in_ifaddr = NULL;
26
27 void rxk_InitializeSocket();
28
29 void afs_rxevent_daemon(void)
30 {
31     struct timespec tv;
32     struct clock temp;
33     int i = 0;
34
35     AFS_GUNLOCK();
36     while(1) {
37         tv.tv_sec = SECONDS_TO_SLEEP;
38         tv.tv_nsec = NANO_SECONDS_TO_SLEEP;
39         usr_thread_sleep(&tv);
40         /*
41          * Check for shutdown, don't try to stop the listener
42          */
43         if (afs_termState == AFSOP_STOP_RXEVENT ||
44             afs_termState == AFSOP_STOP_RXK_LISTENER) {
45             AFS_GLOCK();
46             afs_termState = AFSOP_STOP_COMPLETE;
47             afs_osi_Wakeup(&afs_termState);
48             return;
49         }
50         rxevent_RaiseEvents(&temp);
51         if (++i >= LOOPS_PER_WAITCHECK) {
52             i = 0;
53             afs_osi_CheckTimedWaits();
54         }
55     }
56 }
57
58
59 /* Loop to listen on a socket. Return setting *newcallp if this
60  * thread should become a server thread.  */
61 void rxi_ListenerProc(osi_socket usockp, int *tnop, struct rx_call **newcallp)
62 {
63     struct rx_packet *tp;
64     afs_uint32 host;
65     u_short port;
66     int rc;
67
68     /*
69      * Use the rxk_GetPacketProc and rxk_PacketArrivalProc routines
70      * to allocate rx_packet buffers and pass them to the RX layer
71      * for processing.
72      */
73     while (1) {
74         tp = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE);
75         usr_assert(tp != NULL);
76         rc = rxi_ReadPacket(usockp->sock, tp, &host, &port);
77         if (rc != 0) {
78             tp = rxi_ReceivePacket(tp, usockp, host, port, tnop, newcallp);
79             if (newcallp && *newcallp) {
80                 if (tp) {
81                     rxi_FreePacket(tp);
82                 }
83                 return;
84             }
85         }
86         if (tp) {
87             rxi_FreePacket(tp);
88         }
89         if (afs_termState == AFSOP_STOP_RXEVENT ) {
90             afs_termState = AFSOP_STOP_RXK_LISTENER;
91             afs_osi_Wakeup(&afs_termState);
92         }
93     }
94 }
95
96 /* This is the listener process request loop. The listener process loop
97  * becomes a server thread when rxi_ListenerProc returns, and stays
98  * server thread until rxi_ServerProc returns. */
99 void rxk_Listener(void)
100 {
101     int threadID;
102     osi_socket sock = (osi_socket) rx_socket;
103     struct rx_call *newcall;
104     struct usr_socket *usockp;
105
106     /*
107      * Initialize the rx_socket and start the receiver threads
108      */
109     rxk_InitializeSocket();
110
111     usockp = (struct usr_socket *)rx_socket;
112     assert(usockp != NULL);
113
114     AFS_GUNLOCK();
115     while(1) {
116         newcall = NULL;
117         threadID = -1;
118         rxi_ListenerProc(sock, &threadID, &newcall);
119         /* assert(threadID != -1); */
120         /* assert(newcall != NULL); */
121         sock = OSI_NULLSOCKET;
122         rxi_ServerProc(threadID, newcall, &sock);
123         if (sock == OSI_NULLSOCKET) {
124             break;
125         }
126     }
127     AFS_GLOCK();
128 }
129
130 /* This is the server process request loop. The server process loop
131  * becomes a listener thread when rxi_ServerProc returns, and stays
132  * listener thread until rxi_ListenerProc returns. */
133 void rx_ServerProc(void)
134 {
135     osi_socket sock;
136     int threadID;
137     struct rx_call *newcall = NULL;
138
139     rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
140     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
141     /* threadID is used for making decisions in GetCall.  Get it by bumping
142      * number of threads handling incoming calls */
143     threadID = rxi_availProcs++;
144
145     AFS_GUNLOCK();
146     while(1) {
147         sock = OSI_NULLSOCKET;
148         rxi_ServerProc(threadID, newcall, &sock);
149         if (sock == OSI_NULLSOCKET) {
150             break;
151         }
152         newcall = NULL;
153         threadID = -1;
154         rxi_ListenerProc(sock, &threadID, &newcall);
155         /* assert(threadID != -1); */
156         /* assert(newcall != NULL); */
157     }
158     AFS_GLOCK();
159 }
160
161 /*
162  * At this point, RX wants a socket, but still has not initialized the
163  * rx_port variable or the pointers to the packet allocater and arrival
164  * routines. Allocate the socket buffer here, but don't open it until
165  * we start the receiver threads.
166  */
167 struct osi_socket *rxk_NewSocket(short aport)
168 {
169     struct usr_socket *usockp;
170
171     usockp = (struct usr_socket *)afs_osi_Alloc(sizeof(struct usr_socket));
172     usr_assert(usockp != NULL);
173
174     usockp->sock = -1;
175
176     return (struct osi_socket *)usockp;
177 }
178
179 /*
180  * This routine is called from rxk_Listener. By this time rx_port
181  * is set to 7001 and rx_socket points to the socket buffer
182  * we allocated in rxk_NewSocket. Now is the time to bind our
183  * socket and start the receiver threads.
184  */
185 void rxk_InitializeSocket(void)
186 {
187     int rc, sock, i;
188 #ifdef AFS_USR_AIX_ENV
189     unsigned long len, optval, optval0, optlen;
190 #else /* AFS_USR_AIX_ENV */
191     int len, optval, optval0, optlen;
192 #endif /* AFS_USR_AIX_ENV */
193     struct usr_socket *usockp;
194     struct sockaddr_in lcladdr;
195
196     usr_assert(rx_socket != NULL);
197     usockp = (struct usr_socket *)rx_socket;
198
199 #undef socket
200     sock = socket(PF_INET, SOCK_DGRAM, 0);
201     usr_assert(sock >= 0);
202
203     memset((void *)&lcladdr, 0, sizeof(struct sockaddr_in));
204     lcladdr.sin_family = AF_INET;
205     lcladdr.sin_port = htons(usr_rx_port);
206     lcladdr.sin_addr.s_addr = INADDR_ANY;
207     rc = bind(sock, (struct sockaddr *)&lcladdr, sizeof(struct sockaddr_in));
208     usr_assert(rc >= 0);
209     len = sizeof(struct sockaddr_in);
210     rc = getsockname(sock, (struct sockaddr *)&lcladdr, &len);
211     usr_assert(rc >= 0);
212 #ifdef AFS_USR_LINUX22_ENV
213     optval0 = 131070;
214 #else
215     optval0 = 131072;
216 #endif
217     optval = optval0;
218     rc = setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
219                     (void *)&optval, sizeof(optval));
220     usr_assert(rc == 0);
221     optlen = sizeof(optval);
222     rc = getsockopt(sock, SOL_SOCKET, SO_SNDBUF,
223                     (void *)&optval, &optlen);
224     usr_assert(rc == 0);
225     usr_assert(optval == optval0);
226 #ifdef AFS_USR_LINUX22_ENV
227     optval0 = 131070;
228 #else
229     optval0 = 131072;
230 #endif
231     optval = optval0;
232     rc = setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
233                     (void *)&optval, sizeof(optval));
234     usr_assert(rc == 0);
235     optlen = sizeof(optval);
236     rc = getsockopt(sock, SOL_SOCKET, SO_RCVBUF,
237                     (void *)&optval, &optlen);
238     usr_assert(rc == 0);
239     usr_assert(optval == optval0);
240
241 #ifdef AFS_USR_AIX_ENV
242     optval = 1;
243     rc = setsockopt(sock, SOL_SOCKET, SO_CKSUMRECV,
244                     (void *)&optval, sizeof(optval));
245     usr_assert(rc == 0);
246 #endif /* AFS_USR_AIX_ENV */
247
248     usockp->sock = sock;
249     usockp->port = lcladdr.sin_port;
250
251     /*
252      * Set the value of rx_port to reflect the address we actually
253      * are listening on, since the kernel is probably already using 7001.
254      */
255     rx_port = usockp->port;
256 }
257
258 int rxk_FreeSocket(struct usr_socket *sockp)
259 {
260     return 0;
261 }
262
263 void osi_StopListener(void)
264 {
265     rxk_FreeSocket((struct usr_socket *)rx_socket);
266 }
267
268 int osi_NetSend(osi_socket sockp, struct sockaddr_in *addr, 
269         struct iovec *iov, int nio, afs_int32 size, int stack) 
270 {
271     int rc;
272     int i;
273     unsigned long tmp;
274     struct usr_socket *usockp = (struct usr_socket *)sockp;
275     struct msghdr msg;
276     struct iovec tmpiov[64];
277
278     /*
279      * The header is in the first iovec
280      */
281     usr_assert(nio > 0 && nio <= 64);
282     for (i = 0 ; i < nio ; i++) {
283         tmpiov[i].iov_base = iov[i].iov_base;
284         tmpiov[i].iov_len = iov[i].iov_len;
285     }
286
287     memset(&msg, 0, sizeof(msg));
288     msg.msg_name = (void *)addr;
289     msg.msg_namelen = sizeof(struct sockaddr_in);
290     msg.msg_iov = &tmpiov[0];
291     msg.msg_iovlen = nio;
292
293     rc = sendmsg(usockp->sock, &msg, 0);
294     if (rc < 0) {
295         return errno;
296     }
297     usr_assert(rc == size);
298
299     return 0;
300 }
301
302 void shutdown_rxkernel(void)
303 {
304     rxk_initDone = 0;
305     rxk_shutdownPorts();
306 }
307
308 void rx_Finalize(void)
309 {
310     usr_assert(0);
311 }
312
313 /*
314  * Recvmsg.
315  *
316  */
317 int rxi_Recvmsg(int socket, struct msghdr *msg_p, int flags)
318 {
319     int ret;
320     do {
321         ret = recvmsg(socket, msg_p, flags);
322     } while (ret == -1 && errno == EAGAIN);
323     return ret;
324 }
325