Use calloc, rather than malloc/memset
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 #include <roken.h>
22 #include <afs/opr.h>
23
24 #include <assert.h>
25
26 #include "rx.h"
27 #include "rx_globals.h"
28 #include "rx_pthread.h"
29 #include "rx_clock.h"
30 #include "rx_atomic.h"
31 #include "rx_internal.h"
32 #include "rx_pthread.h"
33 #ifdef AFS_NT40_ENV
34 #include "rx_xmit_nt.h"
35 #endif
36
37 static void rxi_SetThreadNum(int threadID);
38
39 /* Set rx_pthread_event_rescheduled if event_handler should just try
40  * again instead of sleeping.
41  *
42  * Protected by event_handler_mutex
43  */
44 static int rx_pthread_event_rescheduled = 0;
45
46 static void *rx_ListenerProc(void *);
47
48 /*
49  * We supply an event handling thread for Rx's event processing.
50  * The condition variable is used to wakeup the thread whenever a new
51  * event is scheduled earlier than the previous earliest event.
52  * This thread is also responsible for keeping time.
53  */
54 static pthread_t event_handler_thread;
55 afs_kcondvar_t rx_event_handler_cond;
56 afs_kmutex_t event_handler_mutex;
57 afs_kcondvar_t rx_listener_cond;
58 afs_kmutex_t listener_mutex;
59 static int listeners_started = 0;
60 afs_kmutex_t rx_clock_mutex;
61 struct clock rxi_clockNow;
62
63 static rx_atomic_t threadHiNum;
64
65 int
66 rx_NewThreadId(void) {
67     return rx_atomic_inc_and_read(&threadHiNum);
68 }
69
70 /*
71  * Delay the current thread the specified number of seconds.
72  */
73 void
74 rxi_Delay(int sec)
75 {
76     sleep(sec);
77 }
78
79 /*
80  * Called from rx_Init()
81  */
82 void
83 rxi_InitializeThreadSupport(void)
84 {
85         /* listeners_started must only be reset if
86          * the listener thread terminates */
87         /* listeners_started = 0; */
88     clock_GetTime(&rxi_clockNow);
89 }
90
91 static void *
92 server_entry(void *argp)
93 {
94     void (*server_proc) (void *) = (void (*)(void *))argp;
95     server_proc(NULL);
96     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
97     return (void *) -1; /* reused as return value, see pthread(3) */
98 }
99
100 /*
101  * Start an Rx server process.
102  */
103 void
104 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
105 {
106     pthread_t thread;
107     pthread_attr_t tattr;
108     AFS_SIGSET_DECL;
109
110     if (pthread_attr_init(&tattr) != 0) {
111         osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
112     }
113
114     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
115         osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
116     }
117
118     /*
119      * NOTE: We are ignoring the stack size parameter, for now.
120      */
121     AFS_SIGSET_CLEAR();
122     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
123         osi_Panic("Unable to Create Rx server thread\n");
124     }
125     AFS_SIGSET_RESTORE();
126 }
127
128 /*
129  * The event handling process.
130  */
131 static void *
132 event_handler(void *argp)
133 {
134     unsigned long rx_pthread_n_event_expired = 0;
135     unsigned long rx_pthread_n_event_waits = 0;
136     long rx_pthread_n_event_woken = 0;
137     unsigned long rx_pthread_n_event_error = 0;
138     struct timespec rx_pthread_next_event_time = { 0, 0 };
139     int error;
140
141     MUTEX_ENTER(&event_handler_mutex);
142
143     for (;;) {
144         struct clock cv;
145         struct clock next;
146
147         MUTEX_EXIT(&event_handler_mutex);
148
149         next.sec = 30;          /* Time to sleep if there are no events scheduled */
150         next.usec = 0;
151         clock_GetTime(&cv);
152         rxevent_RaiseEvents(&next);
153
154         MUTEX_ENTER(&event_handler_mutex);
155         if (rx_pthread_event_rescheduled) {
156             rx_pthread_event_rescheduled = 0;
157             continue;
158         }
159
160         clock_Add(&cv, &next);
161         rx_pthread_next_event_time.tv_sec = cv.sec;
162         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
163         rx_pthread_n_event_waits++;
164         error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
165         if (error == 0) {
166             rx_pthread_n_event_woken++;
167         }
168 #ifdef AFS_NT40_ENV
169         else if (error == ETIMEDOUT) {
170             rx_pthread_n_event_expired++;
171         } else {
172             rx_pthread_n_event_error++;
173         }
174 #else
175         else if (errno == ETIMEDOUT) {
176             rx_pthread_n_event_expired++;
177         } else {
178             rx_pthread_n_event_error++;
179         }
180 #endif
181         rx_pthread_event_rescheduled = 0;
182     }
183     return NULL;
184 }
185
186
187 /*
188  * This routine will get called by the event package whenever a new,
189  * earlier than others, event is posted. */
190 void
191 rxi_ReScheduleEvents(void)
192 {
193     MUTEX_ENTER(&event_handler_mutex);
194     CV_SIGNAL(&rx_event_handler_cond);
195     rx_pthread_event_rescheduled = 1;
196     MUTEX_EXIT(&event_handler_mutex);
197 }
198
199
200 /* Loop to listen on a socket. Return setting *newcallp if this
201  * thread should become a server thread.  */
202 static void
203 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
204 {
205     unsigned int host;
206     u_short port;
207     struct rx_packet *p = (struct rx_packet *)0;
208
209     MUTEX_ENTER(&listener_mutex);
210     while (!listeners_started) {
211         CV_WAIT(&rx_listener_cond, &listener_mutex);
212     }
213     MUTEX_EXIT(&listener_mutex);
214
215     for (;;) {
216         /* See if a check for additional packets was issued */
217         rx_CheckPackets();
218
219         /*
220          * Grab a new packet only if necessary (otherwise re-use the old one)
221          */
222         if (p) {
223             rxi_RestoreDataBufs(p);
224         } else {
225             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
226                 /* Could this happen with multiple socket listeners? */
227                 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
228             }
229         }
230
231         if (rxi_ReadPacket(sock, p, &host, &port)) {
232             clock_NewTime();
233             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
234             if (newcallp && *newcallp) {
235                 if (p)
236                     rxi_FreePacket(p);
237                 return;
238             }
239         }
240     }
241     /* NOTREACHED */
242 }
243
244 /* This is the listener process request loop. The listener process loop
245  * becomes a server thread when rxi_ListenerProc returns, and stays
246  * server thread until rxi_ServerProc returns. */
247 static void *
248 rx_ListenerProc(void *argp)
249 {
250     int threadID;
251     osi_socket sock = (osi_socket)(intptr_t)argp;
252     struct rx_call *newcall;
253
254     while (1) {
255         newcall = NULL;
256         threadID = -1;
257         rxi_ListenerProc(sock, &threadID, &newcall);
258         /* osi_Assert(threadID != -1); */
259         /* osi_Assert(newcall != NULL); */
260         sock = OSI_NULLSOCKET;
261         rxi_SetThreadNum(threadID);
262         rxi_ServerProc(threadID, newcall, &sock);
263         /* osi_Assert(sock != OSI_NULLSOCKET); */
264     }
265     /* not reached */
266     return NULL;
267 }
268
269 /* This is the server process request loop. The server process loop
270  * becomes a listener thread when rxi_ServerProc returns, and stays
271  * listener thread until rxi_ListenerProc returns. */
272 void *
273 rx_ServerProc(void * dummy)
274 {
275     osi_socket sock;
276     int threadID;
277     struct rx_call *newcall = NULL;
278
279     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
280     MUTEX_ENTER(&rx_quota_mutex);
281     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
282     /* threadID is used for making decisions in GetCall.  Get it by bumping
283      * number of threads handling incoming calls */
284     /* Unique thread ID: used for scheduling purposes *and* as index into
285      * the host hold table (fileserver).
286      * The previously used rxi_availProcs is unsuitable as it
287      * will already go up and down as packets arrive while the server
288      * threads are still initialising! The recently introduced
289      * rxi_pthread_hinum does not necessarily lead to a server
290      * thread with id 0, which is not allowed to hop through the
291      * incoming call queue.
292      * So either introduce yet another counter or flag the FCFS
293      * thread... chose the latter.
294      */
295     MUTEX_ENTER(&rx_pthread_mutex);
296     threadID = rx_NewThreadId();
297     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
298         rxi_fcfs_thread_num = threadID;
299     MUTEX_EXIT(&rx_pthread_mutex);
300     ++rxi_availProcs;
301     MUTEX_EXIT(&rx_quota_mutex);
302
303     while (1) {
304         sock = OSI_NULLSOCKET;
305         rxi_SetThreadNum(threadID);
306         rxi_ServerProc(threadID, newcall, &sock);
307         /* osi_Assert(sock != OSI_NULLSOCKET); */
308         newcall = NULL;
309         rxi_ListenerProc(sock, &threadID, &newcall);
310         /* osi_Assert(threadID != -1); */
311         /* osi_Assert(newcall != NULL); */
312     }
313     /* not reached */
314     return NULL;
315 }
316
317 /*
318  * Historically used to start the listener process. We now have multiple
319  * listener processes (one for each socket); these are started by GetUdpSocket.
320  *
321  * The event handling process *is* started here (the old listener used
322  * to also handle events). The listener threads can't actually start
323  * listening until rxi_StartListener is called because most of R may not
324  * be initialized when rxi_Listen is called.
325  */
326 void
327 rxi_StartListener(void)
328 {
329     pthread_attr_t tattr;
330     AFS_SIGSET_DECL;
331
332         if (listeners_started)
333                 return;
334
335     if (pthread_attr_init(&tattr) != 0) {
336         osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
337     }
338
339     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
340         osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
341     }
342
343     AFS_SIGSET_CLEAR();
344     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
345         0) {
346         osi_Panic("Unable to create Rx event handling thread\n");
347     }
348     rx_NewThreadId();
349     AFS_SIGSET_RESTORE();
350
351     MUTEX_ENTER(&listener_mutex);
352     CV_BROADCAST(&rx_listener_cond);
353     listeners_started = 1;
354     MUTEX_EXIT(&listener_mutex);
355
356 }
357
358 /*
359  * Listen on the specified socket.
360  */
361 int
362 rxi_Listen(osi_socket sock)
363 {
364     pthread_t thread;
365     pthread_attr_t tattr;
366     AFS_SIGSET_DECL;
367
368     if (pthread_attr_init(&tattr) != 0) {
369         osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
370     }
371
372     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
373         osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
374     }
375
376     AFS_SIGSET_CLEAR();
377     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
378         osi_Panic("Unable to create socket listener thread\n");
379     }
380     rx_NewThreadId();
381     AFS_SIGSET_RESTORE();
382     return 0;
383 }
384
385
386 /*
387  * Recvmsg.
388  *
389  */
390 int
391 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
392 {
393     int ret;
394 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
395     while((rxi_HandleSocketError(socket)) > 0)
396       ;
397 #endif
398     ret = recvmsg(socket, msg_p, flags);
399     return ret;
400 }
401
402 /*
403  * Sendmsg.
404  */
405 int
406 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
407 {
408     int ret;
409     ret = sendmsg(socket, msg_p, flags);
410 #ifdef AFS_LINUX22_ENV
411     /* linux unfortunately returns ECONNREFUSED if the target port
412      * is no longer in use */
413     /* and EAGAIN if a UDP checksum is incorrect */
414     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
415 #else
416     if (ret == -1) {
417 #endif
418         dpf(("rxi_sendmsg failed, error %d\n", errno));
419         fflush(stdout);
420 #ifndef AFS_NT40_ENV
421         if (errno > 0)
422           return -errno;
423 #else
424             if (WSAGetLastError() > 0)
425               return -WSAGetLastError();
426 #endif
427         return -1;
428     }
429     return 0;
430 }
431
432 struct rx_ts_info_t * rx_ts_info_init(void) {
433     struct rx_ts_info_t * rx_ts_info;
434     rx_ts_info = calloc(1, sizeof(rx_ts_info_t));
435     osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
436 #ifdef RX_ENABLE_TSFPQ
437     queue_Init(&rx_ts_info->_FPQ);
438
439     MUTEX_ENTER(&rx_packets_mutex);
440     rx_TSFPQMaxProcs++;
441     RX_TS_FPQ_COMPUTE_LIMITS;
442     MUTEX_EXIT(&rx_packets_mutex);
443 #endif /* RX_ENABLE_TSFPQ */
444     return rx_ts_info;
445 }
446
447 int
448 rx_GetThreadNum(void) {
449     return (intptr_t)pthread_getspecific(rx_thread_id_key);
450 }
451
452 static void
453 rxi_SetThreadNum(int threadID) {
454     osi_Assert(pthread_setspecific(rx_thread_id_key,
455                                    (void *)(intptr_t)threadID) == 0);
456 }
457
458 int
459 rx_SetThreadNum(void) {
460     int threadId;
461
462     threadId = rx_NewThreadId();
463     rxi_SetThreadNum(threadId);
464     return threadId;
465 }
466