959e9cf87c24148260190f75eafee6bf6586149c
[openafs.git] / src / rx / UKERNEL / rx_knet.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include "afs/param.h"
12
13 RCSID
14     ("$Header$");
15
16 #include "rx/rx_kcommon.h"
17
18
19 #define SECONDS_TO_SLEEP        0
20 #define NANO_SECONDS_TO_SLEEP   100000000       /* 100 milliseconds */
21 #define LOOPS_PER_WAITCHECK     10      /* once per second */
22
23 unsigned short usr_rx_port = 0;
24
25 struct usr_ifnet *usr_ifnet = NULL;
26 struct usr_in_ifaddr *usr_in_ifaddr = NULL;
27
28 void rxk_InitializeSocket();
29
30 void
31 afs_rxevent_daemon(void)
32 {
33     struct timespec tv;
34     struct clock temp;
35     int i = 0;
36
37     AFS_GUNLOCK();
38     while (1) {
39         tv.tv_sec = SECONDS_TO_SLEEP;
40         tv.tv_nsec = NANO_SECONDS_TO_SLEEP;
41         usr_thread_sleep(&tv);
42         /*
43          * Check for shutdown, don't try to stop the listener
44          */
45         if (afs_termState == AFSOP_STOP_RXEVENT
46             || afs_termState == AFSOP_STOP_RXK_LISTENER) {
47             AFS_GLOCK();
48             afs_termState = AFSOP_STOP_COMPLETE;
49             afs_osi_Wakeup(&afs_termState);
50             return;
51         }
52         rxevent_RaiseEvents(&temp);
53         if (++i >= LOOPS_PER_WAITCHECK) {
54             i = 0;
55             afs_osi_CheckTimedWaits();
56         }
57     }
58 }
59
60
61 /* Loop to listen on a socket. Return setting *newcallp if this
62  * thread should become a server thread.  */
63 void
64 rxi_ListenerProc(osi_socket usockp, int *tnop, struct rx_call **newcallp)
65 {
66     struct rx_packet *tp;
67     afs_uint32 host;
68     u_short port;
69     int rc;
70
71     /*
72      * Use the rxk_GetPacketProc and rxk_PacketArrivalProc routines
73      * to allocate rx_packet buffers and pass them to the RX layer
74      * for processing.
75      */
76     while (1) {
77         tp = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE);
78         usr_assert(tp != NULL);
79         rc = rxi_ReadPacket(usockp, tp, &host, &port);
80         if (rc != 0) {
81             tp = rxi_ReceivePacket(tp, usockp, host, port, tnop, newcallp);
82             if (newcallp && *newcallp) {
83                 if (tp) {
84                     rxi_FreePacket(tp);
85                 }
86                 return;
87             }
88         }
89         if (tp) {
90             rxi_FreePacket(tp);
91         }
92         if (afs_termState == AFSOP_STOP_RXEVENT) {
93             afs_termState = AFSOP_STOP_RXK_LISTENER;
94             afs_osi_Wakeup(&afs_termState);
95         }
96     }
97 }
98
99 /* This is the listener process request loop. The listener process loop
100  * becomes a server thread when rxi_ListenerProc returns, and stays
101  * server thread until rxi_ServerProc returns. */
102 void
103 rxk_Listener(void)
104 {
105     int threadID;
106     osi_socket sock = (osi_socket) rx_socket;
107     struct rx_call *newcall;
108     struct usr_socket *usockp;
109
110     /*
111      * Initialize the rx_socket and start the receiver threads
112      */
113     rxk_InitializeSocket();
114
115     usockp = (struct usr_socket *)rx_socket;
116     assert(usockp != NULL);
117
118     AFS_GUNLOCK();
119     while (1) {
120         newcall = NULL;
121         threadID = -1;
122         rxi_ListenerProc(sock, &threadID, &newcall);
123         /* assert(threadID != -1); */
124         /* assert(newcall != NULL); */
125         sock = OSI_NULLSOCKET;
126         rxi_ServerProc(threadID, newcall, &sock);
127         if (sock == OSI_NULLSOCKET) {
128             break;
129         }
130     }
131     AFS_GLOCK();
132 }
133
134 /* This is the server process request loop. The server process loop
135  * becomes a listener thread when rxi_ServerProc returns, and stays
136  * listener thread until rxi_ListenerProc returns. */
137 void
138 rx_ServerProc(void)
139 {
140     osi_socket sock;
141     int threadID;
142     struct rx_call *newcall = NULL;
143
144     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
145     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
146     /* threadID is used for making decisions in GetCall.  Get it by bumping
147      * number of threads handling incoming calls */
148     threadID = rxi_availProcs++;
149
150     AFS_GUNLOCK();
151     while (1) {
152         sock = OSI_NULLSOCKET;
153         rxi_ServerProc(threadID, newcall, &sock);
154         if (sock == OSI_NULLSOCKET) {
155             break;
156         }
157         newcall = NULL;
158         threadID = -1;
159         rxi_ListenerProc(sock, &threadID, &newcall);
160         /* assert(threadID != -1); */
161         /* assert(newcall != NULL); */
162     }
163     AFS_GLOCK();
164 }
165
166 /*
167  * At this point, RX wants a socket, but still has not initialized the
168  * rx_port variable or the pointers to the packet allocater and arrival
169  * routines. Allocate the socket buffer here, but don't open it until
170  * we start the receiver threads.
171  */
172 osi_socket *
173 rxk_NewSocketHost(afs_uint32 ahost, short aport)
174 {
175     struct usr_socket *usockp;
176
177     usockp = (struct usr_socket *)afs_osi_Alloc(sizeof(struct usr_socket));
178     usr_assert(usockp != NULL);
179
180     usockp->sock = -1;
181
182     return (osi_socket *)usockp;
183 }
184
185 osi_socket *
186 rxk_NewSocket(short aport)
187 {
188     return rxk_NewSocketHost(htonl(INADDR_ANY), aport);
189 }
190
191 /*
192  * This routine is called from rxk_Listener. By this time rx_port
193  * is set to 7001 and rx_socket points to the socket buffer
194  * we allocated in rxk_NewSocket. Now is the time to bind our
195  * socket and start the receiver threads.
196  */
197 void
198 rxk_InitializeSocket(void)
199 {
200     int rc, sock, i;
201 #ifdef AFS_USR_AIX_ENV
202     unsigned long len, optval, optval0, optlen;
203 #else /* AFS_USR_AIX_ENV */
204     int len, optval, optval0, optlen;
205 #endif /* AFS_USR_AIX_ENV */
206     struct usr_socket *usockp;
207     struct sockaddr_in lcladdr;
208
209     usr_assert(rx_socket != NULL);
210     usockp = (struct usr_socket *)rx_socket;
211
212 #undef socket
213     sock = socket(PF_INET, SOCK_DGRAM, 0);
214     usr_assert(sock >= 0);
215
216     memset((void *)&lcladdr, 0, sizeof(struct sockaddr_in));
217     lcladdr.sin_family = AF_INET;
218     lcladdr.sin_port = htons(usr_rx_port);
219     lcladdr.sin_addr.s_addr = INADDR_ANY;
220     rc = bind(sock, (struct sockaddr *)&lcladdr, sizeof(struct sockaddr_in));
221     usr_assert(rc >= 0);
222     len = sizeof(struct sockaddr_in);
223     rc = getsockname(sock, (struct sockaddr *)&lcladdr, &len);
224     usr_assert(rc >= 0);
225 #ifdef AFS_USR_LINUX22_ENV
226     optval0 = 131070;
227 #else
228     optval0 = 131072;
229 #endif
230     optval = optval0;
231     rc = setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval,
232                     sizeof(optval));
233     usr_assert(rc == 0);
234     optlen = sizeof(optval);
235     rc = getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval, &optlen);
236     usr_assert(rc == 0);
237     /* usr_assert(optval == optval0); */
238 #ifdef AFS_USR_LINUX22_ENV
239     optval0 = 131070;
240 #else
241     optval0 = 131072;
242 #endif
243     optval = optval0;
244     rc = setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval,
245                     sizeof(optval));
246     usr_assert(rc == 0);
247     optlen = sizeof(optval);
248     rc = getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval, &optlen);
249     usr_assert(rc == 0);
250     /* usr_assert(optval == optval0); */
251
252 #ifdef AFS_USR_AIX_ENV
253     optval = 1;
254     rc = setsockopt(sock, SOL_SOCKET, SO_CKSUMRECV, (void *)&optval,
255                     sizeof(optval));
256     usr_assert(rc == 0);
257 #endif /* AFS_USR_AIX_ENV */
258
259     usockp->sock = sock;
260     usockp->port = lcladdr.sin_port;
261
262     /*
263      * Set the value of rx_port to reflect the address we actually
264      * are listening on, since the kernel is probably already using 7001.
265      */
266     rx_port = usockp->port;
267 }
268
269 int
270 rxk_FreeSocket(struct usr_socket *sockp)
271 {
272     return 0;
273 }
274
275 void
276 osi_StopListener(void)
277 {
278     rxk_FreeSocket((struct usr_socket *)rx_socket);
279 }
280
281 int
282 osi_NetSend(osi_socket sockp, struct sockaddr_in *addr, struct iovec *iov,
283             int nio, afs_int32 size, int stack)
284 {
285     int rc;
286     int i;
287     unsigned long tmp;
288     struct usr_socket *usockp = (struct usr_socket *)sockp;
289     struct msghdr msg;
290     struct iovec tmpiov[64];
291
292     /*
293      * The header is in the first iovec
294      */
295     usr_assert(nio > 0 && nio <= 64);
296     for (i = 0; i < nio; i++) {
297         tmpiov[i].iov_base = iov[i].iov_base;
298         tmpiov[i].iov_len = iov[i].iov_len;
299     }
300
301     memset(&msg, 0, sizeof(msg));
302     msg.msg_name = (void *)addr;
303     msg.msg_namelen = sizeof(struct sockaddr_in);
304     msg.msg_iov = &tmpiov[0];
305     msg.msg_iovlen = nio;
306
307     rc = sendmsg(usockp->sock, &msg, 0);
308     if (rc < 0) {
309         return errno;
310     }
311     usr_assert(rc == size);
312
313     return 0;
314 }
315
316 void
317 shutdown_rxkernel(void)
318 {
319     rxk_initDone = 0;
320     rxk_shutdownPorts();
321 }
322
323 void
324 rx_Finalize(void)
325 {
326     usr_assert(0);
327 }
328
329 /*
330  * Recvmsg.
331  *
332  */
333 int
334 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
335 {
336     int ret;
337     do {
338         ret = recvmsg(socket->sock, msg_p, flags);
339     } while (ret == -1 && errno == EAGAIN);
340     return ret;
341 }