rx: add post RPC procedure capability
[openafs.git] / src / rx / rx_pthread.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * An implementation of the rx socket listener for pthreads (not using select).
12  * This assumes that multiple read system calls may be extant at any given
13  * time. Also implements the pthread-specific event handling for rx.
14  *
15  * rx_pthread.c is used for the thread safe RX package.
16  */
17
18 #include <afsconfig.h>
19 #include <afs/param.h>
20
21 #include <roken.h>
22 #include <afs/opr.h>
23
24 #include <assert.h>
25
26 #include "rx.h"
27 #include "rx_globals.h"
28 #include "rx_pthread.h"
29 #include "rx_clock.h"
30 #include "rx_atomic.h"
31
32 static void rxi_SetThreadNum(int threadID);
33
34 /* Set rx_pthread_event_rescheduled if event_handler should just try
35  * again instead of sleeping.
36  *
37  * Protected by event_handler_mutex
38  */
39 static int rx_pthread_event_rescheduled = 0;
40
41 static void *rx_ListenerProc(void *);
42
43 /*
44  * We supply an event handling thread for Rx's event processing.
45  * The condition variable is used to wakeup the thread whenever a new
46  * event is scheduled earlier than the previous earliest event.
47  * This thread is also responsible for keeping time.
48  */
49 static pthread_t event_handler_thread;
50 afs_kcondvar_t rx_event_handler_cond;
51 afs_kmutex_t event_handler_mutex;
52 afs_kcondvar_t rx_listener_cond;
53 afs_kmutex_t listener_mutex;
54 static int listeners_started = 0;
55 afs_kmutex_t rx_clock_mutex;
56 struct clock rxi_clockNow;
57
58 static rx_atomic_t threadHiNum;
59
60 int
61 rx_NewThreadId(void) {
62     return rx_atomic_inc_and_read(&threadHiNum);
63 }
64
65 /*
66  * Delay the current thread the specified number of seconds.
67  */
68 void
69 rxi_Delay(int sec)
70 {
71     sleep(sec);
72 }
73
74 /*
75  * Called from rx_Init()
76  */
77 void
78 rxi_InitializeThreadSupport(void)
79 {
80         /* listeners_started must only be reset if
81          * the listener thread terminates */
82         /* listeners_started = 0; */
83     clock_GetTime(&rxi_clockNow);
84 }
85
86 static void *
87 server_entry(void *argp)
88 {
89     void (*server_proc) (void *) = (void (*)(void *))argp;
90     server_proc(NULL);
91     dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
92     return (void *) -1; /* reused as return value, see pthread(3) */
93 }
94
95 /*
96  * Start an Rx server process.
97  */
98 void
99 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
100 {
101     pthread_t thread;
102     pthread_attr_t tattr;
103     AFS_SIGSET_DECL;
104
105     if (pthread_attr_init(&tattr) != 0) {
106         osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
107     }
108
109     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
110         osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
111     }
112
113     /*
114      * NOTE: We are ignoring the stack size parameter, for now.
115      */
116     AFS_SIGSET_CLEAR();
117     if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
118         osi_Panic("Unable to Create Rx server thread\n");
119     }
120     AFS_SIGSET_RESTORE();
121 }
122
123 /*
124  * The event handling process.
125  */
126 static void *
127 event_handler(void *argp)
128 {
129     unsigned long rx_pthread_n_event_expired = 0;
130     unsigned long rx_pthread_n_event_waits = 0;
131     long rx_pthread_n_event_woken = 0;
132     unsigned long rx_pthread_n_event_error = 0;
133     struct timespec rx_pthread_next_event_time = { 0, 0 };
134     int error;
135
136     MUTEX_ENTER(&event_handler_mutex);
137
138     for (;;) {
139         struct clock cv;
140         struct clock next;
141
142         MUTEX_EXIT(&event_handler_mutex);
143
144         next.sec = 30;          /* Time to sleep if there are no events scheduled */
145         next.usec = 0;
146         clock_GetTime(&cv);
147         rxevent_RaiseEvents(&next);
148
149         MUTEX_ENTER(&event_handler_mutex);
150         if (rx_pthread_event_rescheduled) {
151             rx_pthread_event_rescheduled = 0;
152             continue;
153         }
154
155         clock_Add(&cv, &next);
156         rx_pthread_next_event_time.tv_sec = cv.sec;
157         rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
158         rx_pthread_n_event_waits++;
159         error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
160         if (error == 0) {
161             rx_pthread_n_event_woken++;
162         }
163 #ifdef AFS_NT40_ENV
164         else if (error == ETIMEDOUT) {
165             rx_pthread_n_event_expired++;
166         } else {
167             rx_pthread_n_event_error++;
168         }
169 #else
170         else if (errno == ETIMEDOUT) {
171             rx_pthread_n_event_expired++;
172         } else {
173             rx_pthread_n_event_error++;
174         }
175 #endif
176         rx_pthread_event_rescheduled = 0;
177     }
178     return NULL;
179 }
180
181
182 /*
183  * This routine will get called by the event package whenever a new,
184  * earlier than others, event is posted. */
185 void
186 rxi_ReScheduleEvents(void)
187 {
188     MUTEX_ENTER(&event_handler_mutex);
189     CV_SIGNAL(&rx_event_handler_cond);
190     rx_pthread_event_rescheduled = 1;
191     MUTEX_EXIT(&event_handler_mutex);
192 }
193
194
195 /* Loop to listen on a socket. Return setting *newcallp if this
196  * thread should become a server thread.  */
197 static void
198 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
199 {
200     unsigned int host;
201     u_short port;
202     struct rx_packet *p = (struct rx_packet *)0;
203
204     MUTEX_ENTER(&listener_mutex);
205     while (!listeners_started) {
206         CV_WAIT(&rx_listener_cond, &listener_mutex);
207     }
208     MUTEX_EXIT(&listener_mutex);
209
210     for (;;) {
211         /* See if a check for additional packets was issued */
212         rx_CheckPackets();
213
214         /*
215          * Grab a new packet only if necessary (otherwise re-use the old one)
216          */
217         if (p) {
218             rxi_RestoreDataBufs(p);
219         } else {
220             if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
221                 /* Could this happen with multiple socket listeners? */
222                 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
223             }
224         }
225
226         if (rxi_ReadPacket(sock, p, &host, &port)) {
227             clock_NewTime();
228             p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
229             if (newcallp && *newcallp) {
230                 if (p)
231                     rxi_FreePacket(p);
232                 return;
233             }
234         }
235     }
236     /* NOTREACHED */
237 }
238
239 /* This is the listener process request loop. The listener process loop
240  * becomes a server thread when rxi_ListenerProc returns, and stays
241  * server thread until rxi_ServerProc returns. */
242 static void *
243 rx_ListenerProc(void *argp)
244 {
245     int threadID;
246     osi_socket sock = (osi_socket)(intptr_t)argp;
247     struct rx_call *newcall;
248
249     while (1) {
250         newcall = NULL;
251         threadID = -1;
252         rxi_ListenerProc(sock, &threadID, &newcall);
253         /* osi_Assert(threadID != -1); */
254         /* osi_Assert(newcall != NULL); */
255         sock = OSI_NULLSOCKET;
256         rxi_SetThreadNum(threadID);
257         rxi_ServerProc(threadID, newcall, &sock);
258         /* osi_Assert(sock != OSI_NULLSOCKET); */
259     }
260     /* not reached */
261     return NULL;
262 }
263
264 /* This is the server process request loop. The server process loop
265  * becomes a listener thread when rxi_ServerProc returns, and stays
266  * listener thread until rxi_ListenerProc returns. */
267 void *
268 rx_ServerProc(void * dummy)
269 {
270     osi_socket sock;
271     int threadID;
272     struct rx_call *newcall = NULL;
273
274     rxi_MorePackets(rx_maxReceiveWindow + 2);   /* alloc more packets */
275     MUTEX_ENTER(&rx_quota_mutex);
276     rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
277     /* threadID is used for making decisions in GetCall.  Get it by bumping
278      * number of threads handling incoming calls */
279     /* Unique thread ID: used for scheduling purposes *and* as index into
280      * the host hold table (fileserver).
281      * The previously used rxi_availProcs is unsuitable as it
282      * will already go up and down as packets arrive while the server
283      * threads are still initialising! The recently introduced
284      * rxi_pthread_hinum does not necessarily lead to a server
285      * thread with id 0, which is not allowed to hop through the
286      * incoming call queue.
287      * So either introduce yet another counter or flag the FCFS
288      * thread... chose the latter.
289      */
290     MUTEX_ENTER(&rx_pthread_mutex);
291     threadID = rx_NewThreadId();
292     if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
293         rxi_fcfs_thread_num = threadID;
294     MUTEX_EXIT(&rx_pthread_mutex);
295     ++rxi_availProcs;
296     MUTEX_EXIT(&rx_quota_mutex);
297
298     while (1) {
299         sock = OSI_NULLSOCKET;
300         rxi_SetThreadNum(threadID);
301         rxi_ServerProc(threadID, newcall, &sock);
302         /* osi_Assert(sock != OSI_NULLSOCKET); */
303         newcall = NULL;
304         rxi_ListenerProc(sock, &threadID, &newcall);
305         /* osi_Assert(threadID != -1); */
306         /* osi_Assert(newcall != NULL); */
307     }
308     /* not reached */
309     return NULL;
310 }
311
312 /*
313  * Historically used to start the listener process. We now have multiple
314  * listener processes (one for each socket); these are started by GetUdpSocket.
315  *
316  * The event handling process *is* started here (the old listener used
317  * to also handle events). The listener threads can't actually start
318  * listening until rxi_StartListener is called because most of R may not
319  * be initialized when rxi_Listen is called.
320  */
321 void
322 rxi_StartListener(void)
323 {
324     pthread_attr_t tattr;
325     AFS_SIGSET_DECL;
326
327         if (listeners_started)
328                 return;
329
330     if (pthread_attr_init(&tattr) != 0) {
331         osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
332     }
333
334     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
335         osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
336     }
337
338     AFS_SIGSET_CLEAR();
339     if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
340         0) {
341         osi_Panic("Unable to create Rx event handling thread\n");
342     }
343     rx_NewThreadId();
344     AFS_SIGSET_RESTORE();
345
346     MUTEX_ENTER(&listener_mutex);
347     CV_BROADCAST(&rx_listener_cond);
348     listeners_started = 1;
349     MUTEX_EXIT(&listener_mutex);
350
351 }
352
353 /*
354  * Listen on the specified socket.
355  */
356 int
357 rxi_Listen(osi_socket sock)
358 {
359     pthread_t thread;
360     pthread_attr_t tattr;
361     AFS_SIGSET_DECL;
362
363     if (pthread_attr_init(&tattr) != 0) {
364         osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
365     }
366
367     if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
368         osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
369     }
370
371     AFS_SIGSET_CLEAR();
372     if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
373         osi_Panic("Unable to create socket listener thread\n");
374     }
375     rx_NewThreadId();
376     AFS_SIGSET_RESTORE();
377     return 0;
378 }
379
380
381 /*
382  * Recvmsg.
383  *
384  */
385 int
386 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
387 {
388     int ret;
389 #if defined(HAVE_LINUX_ERRQUEUE_H) && defined(ADAPT_PMTU)
390     while((rxi_HandleSocketError(socket)) > 0)
391       ;
392 #endif
393     ret = recvmsg(socket, msg_p, flags);
394     return ret;
395 }
396
397 /*
398  * Sendmsg.
399  */
400 int
401 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
402 {
403     int ret;
404     ret = sendmsg(socket, msg_p, flags);
405 #ifdef AFS_LINUX22_ENV
406     /* linux unfortunately returns ECONNREFUSED if the target port
407      * is no longer in use */
408     /* and EAGAIN if a UDP checksum is incorrect */
409     if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
410 #else
411     if (ret == -1) {
412 #endif
413         dpf(("rxi_sendmsg failed, error %d\n", errno));
414         fflush(stdout);
415 #ifndef AFS_NT40_ENV
416         if (errno > 0)
417           return -errno;
418 #else
419             if (WSAGetLastError() > 0)
420               return -WSAGetLastError();
421 #endif
422         return -1;
423     }
424     return 0;
425 }
426
427 struct rx_ts_info_t * rx_ts_info_init(void) {
428     struct rx_ts_info_t * rx_ts_info;
429     rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t));
430     osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
431     memset(rx_ts_info, 0, sizeof(rx_ts_info_t));
432 #ifdef RX_ENABLE_TSFPQ
433     queue_Init(&rx_ts_info->_FPQ);
434
435     MUTEX_ENTER(&rx_packets_mutex);
436     rx_TSFPQMaxProcs++;
437     RX_TS_FPQ_COMPUTE_LIMITS;
438     MUTEX_EXIT(&rx_packets_mutex);
439 #endif /* RX_ENABLE_TSFPQ */
440     return rx_ts_info;
441 }
442
443 int
444 rx_GetThreadNum(void) {
445     return (intptr_t)pthread_getspecific(rx_thread_id_key);
446 }
447
448 static void
449 rxi_SetThreadNum(int threadID) {
450     osi_Assert(pthread_setspecific(rx_thread_id_key,
451                                    (void *)(intptr_t)threadID) == 0);
452 }
453
454 int
455 rx_SetThreadNum(void) {
456     int threadId;
457
458     threadId = rx_NewThreadId();
459     rxi_SetThreadNum(threadId);
460     return threadId;
461 }
462