rx: Add rx_NewThreadId function
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21
22 #include <sys/types.h>
23 #include <errno.h>
24 #include <signal.h>
25 #include <string.h>
26 #ifdef HAVE_STDINT_H
27 # include <stdint.h>
28 #endif
29 #ifndef AFS_NT40_ENV
30 # include <sys/socket.h>
31 # include <sys/file.h>
32 # include <netdb.h>
33 # include <netinet/in.h>
34 # include <net/if.h>
35 # include <sys/ioctl.h>
36 # include <sys/time.h>
37 # include <unistd.h>
38 #endif
39 #include <sys/stat.h>
40 #include <rx/rx.h>
41 #include <rx/rx_globals.h>
42 #include <assert.h>
43 #include <rx/rx_pthread.h>
44 #include <rx/rx_clock.h>
45 #include "rx_atomic.h"
46
47 /* Set rx_pthread_event_rescheduled if event_handler should just try
48  * again instead of sleeping.
49  *
50  * Protected by event_handler_mutex
51  */
52 static int rx_pthread_event_rescheduled = 0;
53
54 static void *rx_ListenerProc(void *);
55
56 /*
57  * We supply an event handling thread for Rx's event processing.
58  * The condition variable is used to wakeup the thread whenever a new
59  * event is scheduled earlier than the previous earliest event.
60  * This thread is also responsible for keeping time.
61  */
62 static pthread_t event_handler_thread;
63 afs_kcondvar_t rx_event_handler_cond;
64 afs_kmutex_t event_handler_mutex;
65 afs_kcondvar_t rx_listener_cond;
66 afs_kmutex_t listener_mutex;
67 static int listeners_started = 0;
68 afs_kmutex_t rx_clock_mutex;
69 struct clock rxi_clockNow;
70
71 static rx_atomic_t threadHiNum;
72
73 int
74 rx_NewThreadId(void) {
75     return rx_atomic_inc_and_read(&threadHiNum);
76 }
77
78 /*
79  * Delay the current thread the specified number of seconds.
80  */
81 void
82 rxi_Delay(int sec)
83 {
84     sleep(sec);
85 }
86
87 /*
88  * Called from rx_Init()
89  */
90 void
91 rxi_InitializeThreadSupport(void)
92 {
93         /* listeners_started must only be reset if
94          * the listener thread terminates */
95         /* listeners_started = 0; */
96     clock_GetTime(&rxi_clockNow);
97 }
98
99 static void *
100 server_entry(void *argp)
101 {
102     void (*server_proc) (void *) = (void (*)(void *))argp;
103     server_proc(NULL);
104     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
105     return (void *) -1; /* reused as return value, see pthread(3) */
106 }
107
108 /*
109  * Start an Rx server process.
110  */
111 void
112 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
113 {
114     pthread_t thread;
115     pthread_attr_t tattr;
116     AFS_SIGSET_DECL;
117
118     if (pthread_attr_init(&tattr) != 0) {
119         dpf(("Unable to Create Rx server thread (pthread_attr_init)\n"));
120         assert(0);
121     }
122
123     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
124         dpf
125             (("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"));
126         assert(0);
127     }
128
129     /*
130      * NOTE: We are ignoring the stack size parameter, for now.
131      */
132     AFS_SIGSET_CLEAR();
133     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
134         dpf(("Unable to Create Rx server thread\n"));
135         assert(0);
136     }
137     AFS_SIGSET_RESTORE();
138 }
139
140 /*
141  * The event handling process.
142  */
143 static void *
144 event_handler(void *argp)
145 {
146     unsigned long rx_pthread_n_event_expired = 0;
147     unsigned long rx_pthread_n_event_waits = 0;
148     long rx_pthread_n_event_woken = 0;
149     unsigned long rx_pthread_n_event_error = 0;
150     struct timespec rx_pthread_next_event_time = { 0, 0 };
151     int error;
152
153     MUTEX_ENTER(&event_handler_mutex);
154
155     for (;;) {
156         struct clock cv;
157         struct clock next;
158
159         MUTEX_EXIT(&event_handler_mutex);
160
161         next.sec = 30;          /* Time to sleep if there are no events scheduled */
162         next.usec = 0;
163         clock_GetTime(&cv);
164         rxevent_RaiseEvents(&next);
165
166         MUTEX_ENTER(&event_handler_mutex);
167         if (rx_pthread_event_rescheduled) {
168             rx_pthread_event_rescheduled = 0;
169             continue;
170         }
171
172         clock_Add(&cv, &next);
173         rx_pthread_next_event_time.tv_sec = cv.sec;
174         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
175         rx_pthread_n_event_waits++;
176         error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
177         if (error == 0) {
178             rx_pthread_n_event_woken++;
179         }
180 #ifdef AFS_NT40_ENV
181         else if (error == ETIMEDOUT) {
182             rx_pthread_n_event_expired++;
183         } else {
184             rx_pthread_n_event_error++;
185         }
186 #else
187         else if (errno == ETIMEDOUT) {
188             rx_pthread_n_event_expired++;
189         } else {
190             rx_pthread_n_event_error++;
191         }
192 #endif
193         rx_pthread_event_rescheduled = 0;
194     }
195     return NULL;
196 }
197
198
199 /*
200  * This routine will get called by the event package whenever a new,
201  * earlier than others, event is posted. */
202 void
203 rxi_ReScheduleEvents(void)
204 {
205     MUTEX_ENTER(&event_handler_mutex);
206     CV_SIGNAL(&rx_event_handler_cond);
207     rx_pthread_event_rescheduled = 1;
208     MUTEX_EXIT(&event_handler_mutex);
209 }
210
211
212 /* Loop to listen on a socket. Return setting *newcallp if this
213  * thread should become a server thread.  */
214 static void
215 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
216 {
217     unsigned int host;
218     u_short port;
219     struct rx_packet *p = (struct rx_packet *)0;
220
221     MUTEX_ENTER(&listener_mutex);
222     while (!listeners_started) {
223         CV_WAIT(&rx_listener_cond, &listener_mutex);
224     }
225     MUTEX_EXIT(&listener_mutex);
226
227     for (;;) {
228         /* See if a check for additional packets was issued */
229         rx_CheckPackets();
230
231         /*
232          * Grab a new packet only if necessary (otherwise re-use the old one)
233          */
234         if (p) {
235             rxi_RestoreDataBufs(p);
236         } else {
237             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
238                 /* Could this happen with multiple socket listeners? */
239                 dpf(("rxi_Listener: no packets!"));     /* Shouldn't happen */
240                 assert(0);
241             }
242         }
243
244         if (rxi_ReadPacket(sock, p, &host, &port)) {
245             clock_NewTime();
246             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
247             if (newcallp && *newcallp) {
248                 if (p)
249                     rxi_FreePacket(p);
250                 return;
251             }
252         }
253     }
254     /* NOTREACHED */
255 }
256
257 /* This is the listener process request loop. The listener process loop
258  * becomes a server thread when rxi_ListenerProc returns, and stays
259  * server thread until rxi_ServerProc returns. */
260 static void *
261 rx_ListenerProc(void *argp)
262 {
263     int threadID;
264     osi_socket sock = (osi_socket)(intptr_t)argp;
265     struct rx_call *newcall;
266
267     while (1) {
268         newcall = NULL;
269         threadID = -1;
270         rxi_ListenerProc(sock, &threadID, &newcall);
271         /* assert(threadID != -1); */
272         /* assert(newcall != NULL); */
273         sock = OSI_NULLSOCKET;
274         assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
275         rxi_ServerProc(threadID, newcall, &sock);
276         /* assert(sock != OSI_NULLSOCKET); */
277     }
278     /* not reached */
279     return NULL;
280 }
281
282 /* This is the server process request loop. The server process loop
283  * becomes a listener thread when rxi_ServerProc returns, and stays
284  * listener thread until rxi_ListenerProc returns. */
285 void *
286 rx_ServerProc(void * dummy)
287 {
288     osi_socket sock;
289     int threadID;
290     struct rx_call *newcall = NULL;
291
292     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
293     MUTEX_ENTER(&rx_quota_mutex);
294     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
295     /* threadID is used for making decisions in GetCall.  Get it by bumping
296      * number of threads handling incoming calls */
297     /* Unique thread ID: used for scheduling purposes *and* as index into
298      * the host hold table (fileserver).
299      * The previously used rxi_availProcs is unsuitable as it
300      * will already go up and down as packets arrive while the server
301      * threads are still initialising! The recently introduced
302      * rxi_pthread_hinum does not necessarily lead to a server
303      * thread with id 0, which is not allowed to hop through the
304      * incoming call queue.
305      * So either introduce yet another counter or flag the FCFS
306      * thread... chose the latter.
307      */
308     MUTEX_ENTER(&rx_pthread_mutex);
309     threadID = rx_NewThreadId();
310     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
311         rxi_fcfs_thread_num = threadID;
312     MUTEX_EXIT(&rx_pthread_mutex);
313     ++rxi_availProcs;
314     MUTEX_EXIT(&rx_quota_mutex);
315
316     while (1) {
317         sock = OSI_NULLSOCKET;
318         assert(pthread_setspecific(rx_thread_id_key, (void *)(intptr_t)threadID) == 0);
319         rxi_ServerProc(threadID, newcall, &sock);
320         /* assert(sock != OSI_NULLSOCKET); */
321         newcall = NULL;
322         rxi_ListenerProc(sock, &threadID, &newcall);
323         /* assert(threadID != -1); */
324         /* assert(newcall != NULL); */
325     }
326     /* not reached */
327     return NULL;
328 }
329
330 /*
331  * Historically used to start the listener process. We now have multiple
332  * listener processes (one for each socket); these are started by GetUdpSocket.
333  *
334  * The event handling process *is* started here (the old listener used
335  * to also handle events). The listener threads can't actually start
336  * listening until rxi_StartListener is called because most of R may not
337  * be initialized when rxi_Listen is called.
338  */
339 void
340 rxi_StartListener(void)
341 {
342     pthread_attr_t tattr;
343     AFS_SIGSET_DECL;
344
345         if (listeners_started)
346                 return;
347
348     if (pthread_attr_init(&tattr) != 0) {
349         dpf
350             (("Unable to create Rx event handling thread (pthread_attr_init)\n"));
351         assert(0);
352     }
353
354     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
355         dpf
356             (("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"));
357         assert(0);
358     }
359
360     AFS_SIGSET_CLEAR();
361     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
362         0) {
363         dpf(("Unable to create Rx event handling thread\n"));
364         assert(0);
365     }
366     rx_NewThreadId();
367     AFS_SIGSET_RESTORE();
368
369     MUTEX_ENTER(&listener_mutex);
370     CV_BROADCAST(&rx_listener_cond);
371     listeners_started = 1;
372     MUTEX_EXIT(&listener_mutex);
373
374 }
375
376 /*
377  * Listen on the specified socket.
378  */
379 int
380 rxi_Listen(osi_socket sock)
381 {
382     pthread_t thread;
383     pthread_attr_t tattr;
384     AFS_SIGSET_DECL;
385
386     if (pthread_attr_init(&tattr) != 0) {
387         dpf
388             (("Unable to create socket listener thread (pthread_attr_init)\n"));
389         assert(0);
390     }
391
392     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
393         dpf
394             (("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"));
395         assert(0);
396     }
397
398     AFS_SIGSET_CLEAR();
399     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
400         dpf(("Unable to create socket listener thread\n"));
401         assert(0);
402     }
403     rx_NewThreadId();
404     AFS_SIGSET_RESTORE();
405     return 0;
406 }
407
408
409 /*
410  * Recvmsg.
411  *
412  */
413 int
414 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
415 {
416     int ret;
417 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
418     while((rxi_HandleSocketError(socket)) > 0)
419       ;
420 #endif
421     ret = recvmsg(socket, msg_p, flags);
422     return ret;
423 }
424
425 /*
426  * Sendmsg.
427  */
428 int
429 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
430 {
431     int ret;
432     ret = sendmsg(socket, msg_p, flags);
433 #ifdef AFS_LINUX22_ENV
434     /* linux unfortunately returns ECONNREFUSED if the target port
435      * is no longer in use */
436     /* and EAGAIN if a UDP checksum is incorrect */
437     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
438 #else
439     if (ret == -1) {
440 #endif
441         dpf(("rxi_sendmsg failed, error %d\n", errno));
442         fflush(stdout);
443 #ifndef AFS_NT40_ENV
444         if (errno > 0)
445           return -errno;
446 #else
447             if (WSAGetLastError() > 0)
448               return -WSAGetLastError();
449 #endif
450         return -1;
451     }
452     return 0;
453 }
454
455 struct rx_ts_info_t * rx_ts_info_init(void) {
456     struct rx_ts_info_t * rx_ts_info;
457     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
458     assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
459     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
460 #ifdef RX_ENABLE_TSFPQ
461     queue_Init(&rx_ts_info->_FPQ);
462
463     MUTEX_ENTER(&rx_packets_mutex);
464     rx_TSFPQMaxProcs++;
465     RX_TS_FPQ_COMPUTE_LIMITS;
466     MUTEX_EXIT(&rx_packets_mutex);
467 #endif /* RX_ENABLE_TSFPQ */
468     return rx_ts_info;
469 }