87a2c05d38940ced0f176545ab8f6c1a93b6602c
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 #include <roken.h>
22
23 #include <assert.h>
24
25 #include "rx.h"
26 #include "rx_globals.h"
27 #include "rx_pthread.h"
28 #include "rx_clock.h"
29 #include "rx_atomic.h"
30
31 /* Set rx_pthread_event_rescheduled if event_handler should just try
32  * again instead of sleeping.
33  *
34  * Protected by event_handler_mutex
35  */
36 static int rx_pthread_event_rescheduled = 0;
37
38 static void *rx_ListenerProc(void *);
39
40 /*
41  * We supply an event handling thread for Rx's event processing.
42  * The condition variable is used to wakeup the thread whenever a new
43  * event is scheduled earlier than the previous earliest event.
44  * This thread is also responsible for keeping time.
45  */
46 static pthread_t event_handler_thread;
47 afs_kcondvar_t rx_event_handler_cond;
48 afs_kmutex_t event_handler_mutex;
49 afs_kcondvar_t rx_listener_cond;
50 afs_kmutex_t listener_mutex;
51 static int listeners_started = 0;
52 afs_kmutex_t rx_clock_mutex;
53 struct clock rxi_clockNow;
54
55 static rx_atomic_t threadHiNum;
56
57 int
58 rx_NewThreadId(void) {
59     return rx_atomic_inc_and_read(&threadHiNum);
60 }
61
62 /*
63  * Delay the current thread the specified number of seconds.
64  */
65 void
66 rxi_Delay(int sec)
67 {
68     sleep(sec);
69 }
70
71 /*
72  * Called from rx_Init()
73  */
74 void
75 rxi_InitializeThreadSupport(void)
76 {
77         /* listeners_started must only be reset if
78          * the listener thread terminates */
79         /* listeners_started = 0; */
80     clock_GetTime(&rxi_clockNow);
81 }
82
83 static void *
84 server_entry(void *argp)
85 {
86     void (*server_proc) (void *) = (void (*)(void *))argp;
87     server_proc(NULL);
88     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
89     return (void *) -1; /* reused as return value, see pthread(3) */
90 }
91
92 /*
93  * Start an Rx server process.
94  */
95 void
96 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
97 {
98     pthread_t thread;
99     pthread_attr_t tattr;
100     AFS_SIGSET_DECL;
101
102     if (pthread_attr_init(&tattr) != 0) {
103         osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
104     }
105
106     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
107         osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
108     }
109
110     /*
111      * NOTE: We are ignoring the stack size parameter, for now.
112      */
113     AFS_SIGSET_CLEAR();
114     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
115         osi_Panic("Unable to Create Rx server thread\n");
116     }
117     AFS_SIGSET_RESTORE();
118 }
119
120 /*
121  * The event handling process.
122  */
123 static void *
124 event_handler(void *argp)
125 {
126     unsigned long rx_pthread_n_event_expired = 0;
127     unsigned long rx_pthread_n_event_waits = 0;
128     long rx_pthread_n_event_woken = 0;
129     unsigned long rx_pthread_n_event_error = 0;
130     struct timespec rx_pthread_next_event_time = { 0, 0 };
131     int error;
132
133     MUTEX_ENTER(&event_handler_mutex);
134
135     for (;;) {
136         struct clock cv;
137         struct clock next;
138
139         MUTEX_EXIT(&event_handler_mutex);
140
141         next.sec = 30;          /* Time to sleep if there are no events scheduled */
142         next.usec = 0;
143         clock_GetTime(&cv);
144         rxevent_RaiseEvents(&next);
145
146         MUTEX_ENTER(&event_handler_mutex);
147         if (rx_pthread_event_rescheduled) {
148             rx_pthread_event_rescheduled = 0;
149             continue;
150         }
151
152         clock_Add(&cv, &next);
153         rx_pthread_next_event_time.tv_sec = cv.sec;
154         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
155         rx_pthread_n_event_waits++;
156         error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
157         if (error == 0) {
158             rx_pthread_n_event_woken++;
159         }
160 #ifdef AFS_NT40_ENV
161         else if (error == ETIMEDOUT) {
162             rx_pthread_n_event_expired++;
163         } else {
164             rx_pthread_n_event_error++;
165         }
166 #else
167         else if (errno == ETIMEDOUT) {
168             rx_pthread_n_event_expired++;
169         } else {
170             rx_pthread_n_event_error++;
171         }
172 #endif
173         rx_pthread_event_rescheduled = 0;
174     }
175     return NULL;
176 }
177
178
179 /*
180  * This routine will get called by the event package whenever a new,
181  * earlier than others, event is posted. */
182 void
183 rxi_ReScheduleEvents(void)
184 {
185     MUTEX_ENTER(&event_handler_mutex);
186     CV_SIGNAL(&rx_event_handler_cond);
187     rx_pthread_event_rescheduled = 1;
188     MUTEX_EXIT(&event_handler_mutex);
189 }
190
191
192 /* Loop to listen on a socket. Return setting *newcallp if this
193  * thread should become a server thread.  */
194 static void
195 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
196 {
197     unsigned int host;
198     u_short port;
199     struct rx_packet *p = (struct rx_packet *)0;
200
201     MUTEX_ENTER(&listener_mutex);
202     while (!listeners_started) {
203         CV_WAIT(&rx_listener_cond, &listener_mutex);
204     }
205     MUTEX_EXIT(&listener_mutex);
206
207     for (;;) {
208         /* See if a check for additional packets was issued */
209         rx_CheckPackets();
210
211         /*
212          * Grab a new packet only if necessary (otherwise re-use the old one)
213          */
214         if (p) {
215             rxi_RestoreDataBufs(p);
216         } else {
217             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
218                 /* Could this happen with multiple socket listeners? */
219                 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
220             }
221         }
222
223         if (rxi_ReadPacket(sock, p, &host, &port)) {
224             clock_NewTime();
225             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
226             if (newcallp && *newcallp) {
227                 if (p)
228                     rxi_FreePacket(p);
229                 return;
230             }
231         }
232     }
233     /* NOTREACHED */
234 }
235
236 /* This is the listener process request loop. The listener process loop
237  * becomes a server thread when rxi_ListenerProc returns, and stays
238  * server thread until rxi_ServerProc returns. */
239 static void *
240 rx_ListenerProc(void *argp)
241 {
242     int threadID;
243     osi_socket sock = (osi_socket)(intptr_t)argp;
244     struct rx_call *newcall;
245
246     while (1) {
247         newcall = NULL;
248         threadID = -1;
249         rxi_ListenerProc(sock, &threadID, &newcall);
250         /* osi_Assert(threadID != -1); */
251         /* osi_Assert(newcall != NULL); */
252         sock = OSI_NULLSOCKET;
253         osi_Assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
254         rxi_ServerProc(threadID, newcall, &sock);
255         /* osi_Assert(sock != OSI_NULLSOCKET); */
256     }
257     /* not reached */
258     return NULL;
259 }
260
261 /* This is the server process request loop. The server process loop
262  * becomes a listener thread when rxi_ServerProc returns, and stays
263  * listener thread until rxi_ListenerProc returns. */
264 void *
265 rx_ServerProc(void * dummy)
266 {
267     osi_socket sock;
268     int threadID;
269     struct rx_call *newcall = NULL;
270
271     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
272     MUTEX_ENTER(&rx_quota_mutex);
273     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
274     /* threadID is used for making decisions in GetCall.  Get it by bumping
275      * number of threads handling incoming calls */
276     /* Unique thread ID: used for scheduling purposes *and* as index into
277      * the host hold table (fileserver).
278      * The previously used rxi_availProcs is unsuitable as it
279      * will already go up and down as packets arrive while the server
280      * threads are still initialising! The recently introduced
281      * rxi_pthread_hinum does not necessarily lead to a server
282      * thread with id 0, which is not allowed to hop through the
283      * incoming call queue.
284      * So either introduce yet another counter or flag the FCFS
285      * thread... chose the latter.
286      */
287     MUTEX_ENTER(&rx_pthread_mutex);
288     threadID = rx_NewThreadId();
289     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
290         rxi_fcfs_thread_num = threadID;
291     MUTEX_EXIT(&rx_pthread_mutex);
292     ++rxi_availProcs;
293     MUTEX_EXIT(&rx_quota_mutex);
294
295     while (1) {
296         sock = OSI_NULLSOCKET;
297         osi_Assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
298         rxi_ServerProc(threadID, newcall, &sock);
299         /* osi_Assert(sock != OSI_NULLSOCKET); */
300         newcall = NULL;
301         rxi_ListenerProc(sock, &threadID, &newcall);
302         /* osi_Assert(threadID != -1); */
303         /* osi_Assert(newcall != NULL); */
304     }
305     /* not reached */
306     return NULL;
307 }
308
309 /*
310  * Historically used to start the listener process. We now have multiple
311  * listener processes (one for each socket); these are started by GetUdpSocket.
312  *
313  * The event handling process *is* started here (the old listener used
314  * to also handle events). The listener threads can't actually start
315  * listening until rxi_StartListener is called because most of R may not
316  * be initialized when rxi_Listen is called.
317  */
318 void
319 rxi_StartListener(void)
320 {
321     pthread_attr_t tattr;
322     AFS_SIGSET_DECL;
323
324         if (listeners_started)
325                 return;
326
327     if (pthread_attr_init(&tattr) != 0) {
328         osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
329     }
330
331     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
332         osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
333     }
334
335     AFS_SIGSET_CLEAR();
336     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
337         0) {
338         osi_Panic("Unable to create Rx event handling thread\n");
339     }
340     rx_NewThreadId();
341     AFS_SIGSET_RESTORE();
342
343     MUTEX_ENTER(&listener_mutex);
344     CV_BROADCAST(&rx_listener_cond);
345     listeners_started = 1;
346     MUTEX_EXIT(&listener_mutex);
347
348 }
349
350 /*
351  * Listen on the specified socket.
352  */
353 int
354 rxi_Listen(osi_socket sock)
355 {
356     pthread_t thread;
357     pthread_attr_t tattr;
358     AFS_SIGSET_DECL;
359
360     if (pthread_attr_init(&tattr) != 0) {
361         osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
362     }
363
364     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
365         osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
366     }
367
368     AFS_SIGSET_CLEAR();
369     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
370         osi_Panic("Unable to create socket listener thread\n");
371     }
372     rx_NewThreadId();
373     AFS_SIGSET_RESTORE();
374     return 0;
375 }
376
377
378 /*
379  * Recvmsg.
380  *
381  */
382 int
383 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
384 {
385     int ret;
386 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
387     while((rxi_HandleSocketError(socket)) > 0)
388       ;
389 #endif
390     ret = recvmsg(socket, msg_p, flags);
391     return ret;
392 }
393
394 /*
395  * Sendmsg.
396  */
397 int
398 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
399 {
400     int ret;
401     ret = sendmsg(socket, msg_p, flags);
402 #ifdef AFS_LINUX22_ENV
403     /* linux unfortunately returns ECONNREFUSED if the target port
404      * is no longer in use */
405     /* and EAGAIN if a UDP checksum is incorrect */
406     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
407 #else
408     if (ret == -1) {
409 #endif
410         dpf(("rxi_sendmsg failed, error %d\n", errno));
411         fflush(stdout);
412 #ifndef AFS_NT40_ENV
413         if (errno > 0)
414           return -errno;
415 #else
416             if (WSAGetLastError() > 0)
417               return -WSAGetLastError();
418 #endif
419         return -1;
420     }
421     return 0;
422 }
423
424 struct rx_ts_info_t * rx_ts_info_init(void) {
425     struct rx_ts_info_t * rx_ts_info;
426     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
427     osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
428     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
429 #ifdef RX_ENABLE_TSFPQ
430     queue_Init(&rx_ts_info->_FPQ);
431
432     MUTEX_ENTER(&rx_packets_mutex);
433     rx_TSFPQMaxProcs++;
434     RX_TS_FPQ_COMPUTE_LIMITS;
435     MUTEX_EXIT(&rx_packets_mutex);
436 #endif /* RX_ENABLE_TSFPQ */
437     return rx_ts_info;
438 }