rx: Don't adjust non-existent events
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 #include <roken.h>
22 #include <afs/opr.h>
23
24 #include <assert.h>
25
26 #include "rx.h"
27 #include "rx_globals.h"
28 #include "rx_pthread.h"
29 #include "rx_clock.h"
30 #include "rx_atomic.h"
31 #ifdef AFS_NT40_ENV
32 #include "rx_xmit_nt.h"
33 #endif
34
35 static void rxi_SetThreadNum(int threadID);
36
37 /* Set rx_pthread_event_rescheduled if event_handler should just try
38  * again instead of sleeping.
39  *
40  * Protected by event_handler_mutex
41  */
42 static int rx_pthread_event_rescheduled = 0;
43
44 static void *rx_ListenerProc(void *);
45
46 /*
47  * We supply an event handling thread for Rx's event processing.
48  * The condition variable is used to wakeup the thread whenever a new
49  * event is scheduled earlier than the previous earliest event.
50  * This thread is also responsible for keeping time.
51  */
52 static pthread_t event_handler_thread;
53 afs_kcondvar_t rx_event_handler_cond;
54 afs_kmutex_t event_handler_mutex;
55 afs_kcondvar_t rx_listener_cond;
56 afs_kmutex_t listener_mutex;
57 static int listeners_started = 0;
58 afs_kmutex_t rx_clock_mutex;
59 struct clock rxi_clockNow;
60
61 static rx_atomic_t threadHiNum;
62
63 int
64 rx_NewThreadId(void) {
65     return rx_atomic_inc_and_read(&threadHiNum);
66 }
67
68 /*
69  * Delay the current thread the specified number of seconds.
70  */
71 void
72 rxi_Delay(int sec)
73 {
74     sleep(sec);
75 }
76
77 /*
78  * Called from rx_Init()
79  */
80 void
81 rxi_InitializeThreadSupport(void)
82 {
83         /* listeners_started must only be reset if
84          * the listener thread terminates */
85         /* listeners_started = 0; */
86     clock_GetTime(&rxi_clockNow);
87 }
88
89 static void *
90 server_entry(void *argp)
91 {
92     void (*server_proc) (void *) = (void (*)(void *))argp;
93     server_proc(NULL);
94     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
95     return (void *) -1; /* reused as return value, see pthread(3) */
96 }
97
98 /*
99  * Start an Rx server process.
100  */
101 void
102 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
103 {
104     pthread_t thread;
105     pthread_attr_t tattr;
106     AFS_SIGSET_DECL;
107
108     if (pthread_attr_init(&tattr) != 0) {
109         osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
110     }
111
112     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
113         osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
114     }
115
116     /*
117      * NOTE: We are ignoring the stack size parameter, for now.
118      */
119     AFS_SIGSET_CLEAR();
120     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
121         osi_Panic("Unable to Create Rx server thread\n");
122     }
123     AFS_SIGSET_RESTORE();
124 }
125
126 /*
127  * The event handling process.
128  */
129 static void *
130 event_handler(void *argp)
131 {
132     unsigned long rx_pthread_n_event_expired = 0;
133     unsigned long rx_pthread_n_event_waits = 0;
134     long rx_pthread_n_event_woken = 0;
135     unsigned long rx_pthread_n_event_error = 0;
136     struct timespec rx_pthread_next_event_time = { 0, 0 };
137     int error;
138
139     MUTEX_ENTER(&event_handler_mutex);
140
141     for (;;) {
142         struct clock cv;
143         struct clock next;
144
145         MUTEX_EXIT(&event_handler_mutex);
146
147         next.sec = 30;          /* Time to sleep if there are no events scheduled */
148         next.usec = 0;
149         clock_GetTime(&cv);
150         rxevent_RaiseEvents(&next);
151
152         MUTEX_ENTER(&event_handler_mutex);
153         if (rx_pthread_event_rescheduled) {
154             rx_pthread_event_rescheduled = 0;
155             continue;
156         }
157
158         clock_Add(&cv, &next);
159         rx_pthread_next_event_time.tv_sec = cv.sec;
160         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
161         rx_pthread_n_event_waits++;
162         error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
163         if (error == 0) {
164             rx_pthread_n_event_woken++;
165         }
166 #ifdef AFS_NT40_ENV
167         else if (error == ETIMEDOUT) {
168             rx_pthread_n_event_expired++;
169         } else {
170             rx_pthread_n_event_error++;
171         }
172 #else
173         else if (errno == ETIMEDOUT) {
174             rx_pthread_n_event_expired++;
175         } else {
176             rx_pthread_n_event_error++;
177         }
178 #endif
179         rx_pthread_event_rescheduled = 0;
180     }
181     return NULL;
182 }
183
184
185 /*
186  * This routine will get called by the event package whenever a new,
187  * earlier than others, event is posted. */
188 void
189 rxi_ReScheduleEvents(void)
190 {
191     MUTEX_ENTER(&event_handler_mutex);
192     CV_SIGNAL(&rx_event_handler_cond);
193     rx_pthread_event_rescheduled = 1;
194     MUTEX_EXIT(&event_handler_mutex);
195 }
196
197
198 /* Loop to listen on a socket. Return setting *newcallp if this
199  * thread should become a server thread.  */
200 static void
201 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
202 {
203     unsigned int host;
204     u_short port;
205     struct rx_packet *p = (struct rx_packet *)0;
206
207     MUTEX_ENTER(&listener_mutex);
208     while (!listeners_started) {
209         CV_WAIT(&rx_listener_cond, &listener_mutex);
210     }
211     MUTEX_EXIT(&listener_mutex);
212
213     for (;;) {
214         /* See if a check for additional packets was issued */
215         rx_CheckPackets();
216
217         /*
218          * Grab a new packet only if necessary (otherwise re-use the old one)
219          */
220         if (p) {
221             rxi_RestoreDataBufs(p);
222         } else {
223             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
224                 /* Could this happen with multiple socket listeners? */
225                 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
226             }
227         }
228
229         if (rxi_ReadPacket(sock, p, &host, &port)) {
230             clock_NewTime();
231             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
232             if (newcallp && *newcallp) {
233                 if (p)
234                     rxi_FreePacket(p);
235                 return;
236             }
237         }
238     }
239     /* NOTREACHED */
240 }
241
242 /* This is the listener process request loop. The listener process loop
243  * becomes a server thread when rxi_ListenerProc returns, and stays
244  * server thread until rxi_ServerProc returns. */
245 static void *
246 rx_ListenerProc(void *argp)
247 {
248     int threadID;
249     osi_socket sock = (osi_socket)(intptr_t)argp;
250     struct rx_call *newcall;
251
252     while (1) {
253         newcall = NULL;
254         threadID = -1;
255         rxi_ListenerProc(sock, &threadID, &newcall);
256         /* osi_Assert(threadID != -1); */
257         /* osi_Assert(newcall != NULL); */
258         sock = OSI_NULLSOCKET;
259         rxi_SetThreadNum(threadID);
260         rxi_ServerProc(threadID, newcall, &sock);
261         /* osi_Assert(sock != OSI_NULLSOCKET); */
262     }
263     /* not reached */
264     return NULL;
265 }
266
267 /* This is the server process request loop. The server process loop
268  * becomes a listener thread when rxi_ServerProc returns, and stays
269  * listener thread until rxi_ListenerProc returns. */
270 void *
271 rx_ServerProc(void * dummy)
272 {
273     osi_socket sock;
274     int threadID;
275     struct rx_call *newcall = NULL;
276
277     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
278     MUTEX_ENTER(&rx_quota_mutex);
279     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
280     /* threadID is used for making decisions in GetCall.  Get it by bumping
281      * number of threads handling incoming calls */
282     /* Unique thread ID: used for scheduling purposes *and* as index into
283      * the host hold table (fileserver).
284      * The previously used rxi_availProcs is unsuitable as it
285      * will already go up and down as packets arrive while the server
286      * threads are still initialising! The recently introduced
287      * rxi_pthread_hinum does not necessarily lead to a server
288      * thread with id 0, which is not allowed to hop through the
289      * incoming call queue.
290      * So either introduce yet another counter or flag the FCFS
291      * thread... chose the latter.
292      */
293     MUTEX_ENTER(&rx_pthread_mutex);
294     threadID = rx_NewThreadId();
295     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
296         rxi_fcfs_thread_num = threadID;
297     MUTEX_EXIT(&rx_pthread_mutex);
298     ++rxi_availProcs;
299     MUTEX_EXIT(&rx_quota_mutex);
300
301     while (1) {
302         sock = OSI_NULLSOCKET;
303         rxi_SetThreadNum(threadID);
304         rxi_ServerProc(threadID, newcall, &sock);
305         /* osi_Assert(sock != OSI_NULLSOCKET); */
306         newcall = NULL;
307         rxi_ListenerProc(sock, &threadID, &newcall);
308         /* osi_Assert(threadID != -1); */
309         /* osi_Assert(newcall != NULL); */
310     }
311     /* not reached */
312     return NULL;
313 }
314
315 /*
316  * Historically used to start the listener process. We now have multiple
317  * listener processes (one for each socket); these are started by GetUdpSocket.
318  *
319  * The event handling process *is* started here (the old listener used
320  * to also handle events). The listener threads can't actually start
321  * listening until rxi_StartListener is called because most of R may not
322  * be initialized when rxi_Listen is called.
323  */
324 void
325 rxi_StartListener(void)
326 {
327     pthread_attr_t tattr;
328     AFS_SIGSET_DECL;
329
330         if (listeners_started)
331                 return;
332
333     if (pthread_attr_init(&tattr) != 0) {
334         osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
335     }
336
337     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
338         osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
339     }
340
341     AFS_SIGSET_CLEAR();
342     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
343         0) {
344         osi_Panic("Unable to create Rx event handling thread\n");
345     }
346     rx_NewThreadId();
347     AFS_SIGSET_RESTORE();
348
349     MUTEX_ENTER(&listener_mutex);
350     CV_BROADCAST(&rx_listener_cond);
351     listeners_started = 1;
352     MUTEX_EXIT(&listener_mutex);
353
354 }
355
356 /*
357  * Listen on the specified socket.
358  */
359 int
360 rxi_Listen(osi_socket sock)
361 {
362     pthread_t thread;
363     pthread_attr_t tattr;
364     AFS_SIGSET_DECL;
365
366     if (pthread_attr_init(&tattr) != 0) {
367         osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
368     }
369
370     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
371         osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
372     }
373
374     AFS_SIGSET_CLEAR();
375     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
376         osi_Panic("Unable to create socket listener thread\n");
377     }
378     rx_NewThreadId();
379     AFS_SIGSET_RESTORE();
380     return 0;
381 }
382
383
384 /*
385  * Recvmsg.
386  *
387  */
388 int
389 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
390 {
391     int ret;
392 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
393     while((rxi_HandleSocketError(socket)) > 0)
394       ;
395 #endif
396     ret = recvmsg(socket, msg_p, flags);
397     return ret;
398 }
399
400 /*
401  * Sendmsg.
402  */
403 int
404 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
405 {
406     int ret;
407     ret = sendmsg(socket, msg_p, flags);
408 #ifdef AFS_LINUX22_ENV
409     /* linux unfortunately returns ECONNREFUSED if the target port
410      * is no longer in use */
411     /* and EAGAIN if a UDP checksum is incorrect */
412     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
413 #else
414     if (ret == -1) {
415 #endif
416         dpf(("rxi_sendmsg failed, error %d\n", errno));
417         fflush(stdout);
418 #ifndef AFS_NT40_ENV
419         if (errno > 0)
420           return -errno;
421 #else
422             if (WSAGetLastError() > 0)
423               return -WSAGetLastError();
424 #endif
425         return -1;
426     }
427     return 0;
428 }
429
430 struct rx_ts_info_t * rx_ts_info_init(void) {
431     struct rx_ts_info_t * rx_ts_info;
432     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
433     osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
434     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
435 #ifdef RX_ENABLE_TSFPQ
436     queue_Init(&rx_ts_info->_FPQ);
437
438     MUTEX_ENTER(&rx_packets_mutex);
439     rx_TSFPQMaxProcs++;
440     RX_TS_FPQ_COMPUTE_LIMITS;
441     MUTEX_EXIT(&rx_packets_mutex);
442 #endif /* RX_ENABLE_TSFPQ */
443     return rx_ts_info;
444 }
445
446 int
447 rx_GetThreadNum(void) {
448     return (intptr_t)pthread_getspecific(rx_thread_id_key);
449 }
450
451 static void
452 rxi_SetThreadNum(int threadID) {
453     osi_Assert(pthread_setspecific(rx_thread_id_key,
454                                    (void *)(intptr_t)threadID) == 0);
455 }
456
457 int
458 rx_SetThreadNum(void) {
459     int threadId;
460
461     threadId = rx_NewThreadId();
462     rxi_SetThreadNum(threadId);
463     return threadId;
464 }
465