2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
21 #include <net/net_globals.h>
22 #endif /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
33 #undef RXDEBUG /* turn off debugging */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
38 #include "../afsint/afsint.h"
45 #endif /* AFS_ALPHA_ENV */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
60 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
69 # include <afs/param.h>
70 # include <sys/types.h>
77 # include <sys/socket.h>
78 # include <sys/file.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
95 extern afs_uint32 LWP_ThreadId();
98 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
100 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
101 afs_int32 rxi_start_in_error;
103 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
106 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
107 * currently allocated within rx. This number is used to allocate the
108 * memory required to return the statistics when queried.
111 static unsigned int rxi_rpc_peer_stat_cnt;
114 * rxi_rpc_process_stat_cnt counts the total number of local process stat
115 * structures currently allocated within rx. The number is used to allocate
116 * the memory required to return the statistics when queried.
119 static unsigned int rxi_rpc_process_stat_cnt;
121 #if !defined(offsetof)
122 #include <stddef.h> /* for definition of offsetof() */
125 #ifdef AFS_PTHREAD_ENV
129 * Use procedural initialization of mutexes/condition variables
133 extern pthread_mutex_t rxkad_stats_mutex;
134 extern pthread_mutex_t des_init_mutex;
135 extern pthread_mutex_t des_random_mutex;
136 extern pthread_mutex_t rx_clock_mutex;
137 extern pthread_mutex_t rxi_connCacheMutex;
138 extern pthread_mutex_t rx_event_mutex;
139 extern pthread_mutex_t osi_malloc_mutex;
140 extern pthread_mutex_t event_handler_mutex;
141 extern pthread_mutex_t listener_mutex;
142 extern pthread_mutex_t rx_if_init_mutex;
143 extern pthread_mutex_t rx_if_mutex;
144 extern pthread_mutex_t rxkad_client_uid_mutex;
145 extern pthread_mutex_t rxkad_random_mutex;
147 extern pthread_cond_t rx_event_handler_cond;
148 extern pthread_cond_t rx_listener_cond;
150 static pthread_mutex_t epoch_mutex;
151 static pthread_mutex_t rx_init_mutex;
152 static pthread_mutex_t rx_debug_mutex;
154 static void rxi_InitPthread(void) {
155 assert(pthread_mutex_init(&rx_clock_mutex,
156 (const pthread_mutexattr_t*)0)==0);
157 assert(pthread_mutex_init(&rxi_connCacheMutex,
158 (const pthread_mutexattr_t*)0)==0);
159 assert(pthread_mutex_init(&rx_init_mutex,
160 (const pthread_mutexattr_t*)0)==0);
161 assert(pthread_mutex_init(&epoch_mutex,
162 (const pthread_mutexattr_t*)0)==0);
163 assert(pthread_mutex_init(&rx_event_mutex,
164 (const pthread_mutexattr_t*)0)==0);
165 assert(pthread_mutex_init(&des_init_mutex,
166 (const pthread_mutexattr_t*)0)==0);
167 assert(pthread_mutex_init(&des_random_mutex,
168 (const pthread_mutexattr_t*)0)==0);
169 assert(pthread_mutex_init(&osi_malloc_mutex,
170 (const pthread_mutexattr_t*)0)==0);
171 assert(pthread_mutex_init(&event_handler_mutex,
172 (const pthread_mutexattr_t*)0)==0);
173 assert(pthread_mutex_init(&listener_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rx_if_init_mutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_if_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rxkad_random_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&rxkad_stats_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&rx_debug_mutex,
186 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_cond_init(&rx_event_handler_cond,
189 (const pthread_condattr_t*)0)==0);
190 assert(pthread_cond_init(&rx_listener_cond,
191 (const pthread_condattr_t*)0)==0);
192 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
195 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
196 #define INIT_PTHREAD_LOCKS \
197 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
199 * The rx_stats_mutex mutex protects the following global variables:
204 * rxi_lowConnRefCount
205 * rxi_lowPeerRefCount
214 #define INIT_PTHREAD_LOCKS
217 extern void rxi_DeleteCachedConnections(void);
220 /* Variables for handling the minProcs implementation. availProcs gives the
221 * number of threads available in the pool at this moment (not counting dudes
222 * executing right now). totalMin gives the total number of procs required
223 * for handling all minProcs requests. minDeficit is a dynamic variable
224 * tracking the # of procs required to satisfy all of the remaining minProcs
226 * For fine grain locking to work, the quota check and the reservation of
227 * a server thread has to come while rxi_availProcs and rxi_minDeficit
228 * are locked. To this end, the code has been modified under #ifdef
229 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
230 * same time. A new function, ReturnToServerPool() returns the allocation.
232 * A call can be on several queue's (but only one at a time). When
233 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
234 * that no one else is touching the queue. To this end, we store the address
235 * of the queue lock in the call structure (under the call lock) when we
236 * put the call on a queue, and we clear the call_queue_lock when the
237 * call is removed from a queue (once the call lock has been obtained).
238 * This allows rxi_ResetCall to safely synchronize with others wishing
239 * to manipulate the queue.
242 extern void rxi_Delay(int);
244 static int rxi_ServerThreadSelectingCall;
246 #ifdef RX_ENABLE_LOCKS
247 static afs_kmutex_t rx_rpc_stats;
248 void rxi_StartUnlocked();
251 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
252 ** pretty good that the next packet coming in is from the same connection
253 ** as the last packet, since we're send multiple packets in a transmit window.
255 struct rx_connection *rxLastConn = 0;
257 #ifdef RX_ENABLE_LOCKS
258 /* The locking hierarchy for rx fine grain locking is composed of five
260 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
261 * call->lock - locks call data fields.
262 * Most any other lock - these are all independent of each other.....
264 * rx_freeCallQueue_lock
266 * rx_connHashTable_lock
269 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
272 * peer_lock - locks peer data fields.
273 * conn_data_lock - that more than one thread is not updating a conn data
274 * field at the same time.
275 * Do we need a lock to protect the peer field in the conn structure?
276 * conn->peer was previously a constant for all intents and so has no
277 * lock protecting this field. The multihomed client delta introduced
278 * a RX code change : change the peer field in the connection structure
279 * to that remote inetrface from which the last packet for this
280 * connection was sent out. This may become an issue if further changes
283 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
284 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
286 /* rxdb_fileID is used to identify the lock location, along with line#. */
287 static int rxdb_fileID = RXDB_FILE_RX;
288 #endif /* RX_LOCKS_DB */
289 static void rxi_SetAcksInTransmitQueue();
290 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
291 #else /* RX_ENABLE_LOCKS */
292 #define SET_CALL_QUEUE_LOCK(C, L)
293 #define CLEAR_CALL_QUEUE_LOCK(C)
294 #endif /* RX_ENABLE_LOCKS */
295 static void rxi_DestroyConnectionNoLock();
296 void rxi_DestroyConnection();
297 void rxi_CleanupConnection();
298 struct rx_serverQueueEntry *rx_waitForPacket = 0;
300 /* ------------Exported Interfaces------------- */
302 /* This function allows rxkad to set the epoch to a suitably random number
303 * which rx_NewConnection will use in the future. The principle purpose is to
304 * get rxnull connections to use the same epoch as the rxkad connections do, at
305 * least once the first rxkad connection is established. This is important now
306 * that the host/port addresses aren't used in FindConnection: the uniqueness
307 * of epoch/cid matters and the start time won't do. */
309 #ifdef AFS_PTHREAD_ENV
311 * This mutex protects the following global variables:
315 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
316 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
320 #endif /* AFS_PTHREAD_ENV */
322 void rx_SetEpoch (epoch)
330 /* Initialize rx. A port number may be mentioned, in which case this
331 * becomes the default port number for any service installed later.
332 * If 0 is provided for the port number, a random port will be chosen
333 * by the kernel. Whether this will ever overlap anything in
334 * /etc/services is anybody's guess... Returns 0 on success, -1 on
336 static int rxinit_status = 1;
337 #ifdef AFS_PTHREAD_ENV
339 * This mutex protects the following global variables:
343 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
344 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
347 #define UNLOCK_RX_INIT
350 int rx_Init(u_int port)
357 char *htable, *ptable;
364 if (rxinit_status == 0) {
365 tmp_status = rxinit_status;
367 return tmp_status; /* Already started; return previous error code. */
371 if (afs_winsockInit()<0)
377 * Initialize anything necessary to provide a non-premptive threading
380 rxi_InitializeThreadSupport();
383 /* Allocate and initialize a socket for client and perhaps server
386 rx_socket = rxi_GetUDPSocket((u_short)port);
387 if (rx_socket == OSI_NULLSOCKET) {
393 #ifdef RX_ENABLE_LOCKS
396 #endif /* RX_LOCKS_DB */
397 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
398 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
399 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
400 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
401 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
403 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
404 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
405 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
406 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
408 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
410 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
411 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
413 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
414 #endif /* KERNEL && AFS_HPUX110_ENV */
415 #else /* RX_ENABLE_LOCKS */
416 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
417 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
418 #endif /* AFS_GLOBAL_SUNLOCK */
419 #endif /* RX_ENABLE_LOCKS */
422 rx_connDeadTime = 12;
423 rx_tranquil = 0; /* reset flag */
424 bzero((char *)&rx_stats, sizeof(struct rx_stats));
426 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
427 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
428 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
429 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
430 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
431 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
433 /* Malloc up a bunch of packets & buffers */
435 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
436 queue_Init(&rx_freePacketQueue);
437 rxi_NeedMorePackets = FALSE;
438 rxi_MorePackets(rx_nPackets);
446 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
447 tv.tv_sec = clock_now.sec;
448 tv.tv_usec = clock_now.usec;
449 srand((unsigned int) tv.tv_usec);
456 #if defined(KERNEL) && !defined(UKERNEL)
457 /* Really, this should never happen in a real kernel */
460 struct sockaddr_in addr;
461 int addrlen = sizeof(addr);
462 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
466 rx_port = addr.sin_port;
469 rx_stats.minRtt.sec = 9999999;
471 rx_SetEpoch (tv.tv_sec | 0x80000000);
473 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
474 * will provide a randomer value. */
476 MUTEX_ENTER(&rx_stats_mutex);
477 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
478 MUTEX_EXIT(&rx_stats_mutex);
479 /* *Slightly* random start time for the cid. This is just to help
480 * out with the hashing function at the peer */
481 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
482 rx_connHashTable = (struct rx_connection **) htable;
483 rx_peerHashTable = (struct rx_peer **) ptable;
485 rx_lastAckDelay.sec = 0;
486 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
487 rx_hardAckDelay.sec = 0;
488 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
489 rx_softAckDelay.sec = 0;
490 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
492 rxevent_Init(20, rxi_ReScheduleEvents);
494 /* Initialize various global queues */
495 queue_Init(&rx_idleServerQueue);
496 queue_Init(&rx_incomingCallQueue);
497 queue_Init(&rx_freeCallQueue);
499 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
500 /* Initialize our list of usable IP addresses. */
504 /* Start listener process (exact function is dependent on the
505 * implementation environment--kernel or user space) */
510 tmp_status = rxinit_status = 0;
515 /* called with unincremented nRequestsRunning to see if it is OK to start
516 * a new thread in this service. Could be "no" for two reasons: over the
517 * max quota, or would prevent others from reaching their min quota.
519 #ifdef RX_ENABLE_LOCKS
520 /* This verion of QuotaOK reserves quota if it's ok while the
521 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
523 static int QuotaOK(aservice)
524 register struct rx_service *aservice;
526 /* check if over max quota */
527 if (aservice->nRequestsRunning >= aservice->maxProcs) {
531 /* under min quota, we're OK */
532 /* otherwise, can use only if there are enough to allow everyone
533 * to go to their min quota after this guy starts.
535 MUTEX_ENTER(&rx_stats_mutex);
536 if ((aservice->nRequestsRunning < aservice->minProcs) ||
537 (rxi_availProcs > rxi_minDeficit)) {
538 aservice->nRequestsRunning++;
539 /* just started call in minProcs pool, need fewer to maintain
541 if (aservice->nRequestsRunning <= aservice->minProcs)
544 MUTEX_EXIT(&rx_stats_mutex);
547 MUTEX_EXIT(&rx_stats_mutex);
551 static void ReturnToServerPool(aservice)
552 register struct rx_service *aservice;
554 aservice->nRequestsRunning--;
555 MUTEX_ENTER(&rx_stats_mutex);
556 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
558 MUTEX_EXIT(&rx_stats_mutex);
561 #else /* RX_ENABLE_LOCKS */
562 static QuotaOK(aservice)
563 register struct rx_service *aservice; {
565 /* under min quota, we're OK */
566 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
568 /* check if over max quota */
569 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
571 /* otherwise, can use only if there are enough to allow everyone
572 * to go to their min quota after this guy starts.
574 if (rxi_availProcs > rxi_minDeficit) rc = 1;
577 #endif /* RX_ENABLE_LOCKS */
580 /* Called by rx_StartServer to start up lwp's to service calls.
581 NExistingProcs gives the number of procs already existing, and which
582 therefore needn't be created. */
583 void rxi_StartServerProcs(nExistingProcs)
586 register struct rx_service *service;
591 /* For each service, reserve N processes, where N is the "minimum"
592 number of processes that MUST be able to execute a request in parallel,
593 at any time, for that process. Also compute the maximum difference
594 between any service's maximum number of processes that can run
595 (i.e. the maximum number that ever will be run, and a guarantee
596 that this number will run if other services aren't running), and its
597 minimum number. The result is the extra number of processes that
598 we need in order to provide the latter guarantee */
599 for (i=0; i<RX_MAX_SERVICES; i++) {
601 service = rx_services[i];
602 if (service == (struct rx_service *) 0) break;
603 nProcs += service->minProcs;
604 diff = service->maxProcs - service->minProcs;
605 if (diff > maxdiff) maxdiff = diff;
607 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
608 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
609 for (i = 0; i<nProcs; i++) {
610 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
615 /* This routine must be called if any services are exported. If the
616 * donateMe flag is set, the calling process is donated to the server
618 void rx_StartServer(donateMe)
620 register struct rx_service *service;
627 /* Start server processes, if necessary (exact function is dependent
628 * on the implementation environment--kernel or user space). DonateMe
629 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
630 * case, one less new proc will be created rx_StartServerProcs.
632 rxi_StartServerProcs(donateMe);
634 /* count up the # of threads in minProcs, and add set the min deficit to
635 * be that value, too.
637 for (i=0; i<RX_MAX_SERVICES; i++) {
638 service = rx_services[i];
639 if (service == (struct rx_service *) 0) break;
640 MUTEX_ENTER(&rx_stats_mutex);
641 rxi_totalMin += service->minProcs;
642 /* below works even if a thread is running, since minDeficit would
643 * still have been decremented and later re-incremented.
645 rxi_minDeficit += service->minProcs;
646 MUTEX_EXIT(&rx_stats_mutex);
649 /* Turn on reaping of idle server connections */
650 rxi_ReapConnections();
655 if (donateMe) rx_ServerProc(); /* Never returns */
659 /* Create a new client connection to the specified service, using the
660 * specified security object to implement the security model for this
662 struct rx_connection *
663 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
664 register afs_uint32 shost; /* Server host */
665 u_short sport; /* Server port */
666 u_short sservice; /* Server service id */
667 register struct rx_securityClass *securityObject;
668 int serviceSecurityIndex;
672 register struct rx_connection *conn;
677 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
678 shost, sport, sservice, securityObject, serviceSecurityIndex));
680 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
681 * the case of kmem_alloc? */
682 conn = rxi_AllocConnection();
683 #ifdef RX_ENABLE_LOCKS
684 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
685 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
686 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
690 MUTEX_ENTER(&rx_connHashTable_lock);
691 cid = (rx_nextCid += RX_MAXCALLS);
692 conn->type = RX_CLIENT_CONNECTION;
694 conn->epoch = rx_epoch;
695 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
696 conn->serviceId = sservice;
697 conn->securityObject = securityObject;
698 /* This doesn't work in all compilers with void (they're buggy), so fake it
700 conn->securityData = (VOID *) 0;
701 conn->securityIndex = serviceSecurityIndex;
702 rx_SetConnDeadTime(conn, rx_connDeadTime);
703 conn->ackRate = RX_FAST_ACK_RATE;
705 conn->specific = NULL;
706 conn->challengeEvent = (struct rxevent *)0;
707 conn->delayedAbortEvent = (struct rxevent *)0;
708 conn->abortCount = 0;
711 RXS_NewConnection(securityObject, conn);
712 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
714 conn->refCount++; /* no lock required since only this thread knows... */
715 conn->next = rx_connHashTable[hashindex];
716 rx_connHashTable[hashindex] = conn;
717 MUTEX_ENTER(&rx_stats_mutex);
718 rx_stats.nClientConns++;
719 MUTEX_EXIT(&rx_stats_mutex);
721 MUTEX_EXIT(&rx_connHashTable_lock);
727 void rx_SetConnDeadTime(conn, seconds)
728 register struct rx_connection *conn;
729 register int seconds;
731 /* The idea is to set the dead time to a value that allows several
732 * keepalives to be dropped without timing out the connection. */
733 conn->secondsUntilDead = MAX(seconds, 6);
734 conn->secondsUntilPing = conn->secondsUntilDead/6;
737 int rxi_lowPeerRefCount = 0;
738 int rxi_lowConnRefCount = 0;
741 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
742 * NOTE: must not be called with rx_connHashTable_lock held.
744 void rxi_CleanupConnection(conn)
745 struct rx_connection *conn;
749 /* Notify the service exporter, if requested, that this connection
750 * is being destroyed */
751 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
752 (*conn->service->destroyConnProc)(conn);
754 /* Notify the security module that this connection is being destroyed */
755 RXS_DestroyConnection(conn->securityObject, conn);
757 /* If this is the last connection using the rx_peer struct, set its
758 * idle time to now. rxi_ReapConnections will reap it if it's still
759 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
761 MUTEX_ENTER(&rx_peerHashTable_lock);
762 if (--conn->peer->refCount <= 0) {
763 conn->peer->idleWhen = clock_Sec();
764 if (conn->peer->refCount < 0) {
765 conn->peer->refCount = 0;
766 MUTEX_ENTER(&rx_stats_mutex);
767 rxi_lowPeerRefCount ++;
768 MUTEX_EXIT(&rx_stats_mutex);
771 MUTEX_EXIT(&rx_peerHashTable_lock);
773 MUTEX_ENTER(&rx_stats_mutex);
774 if (conn->type == RX_SERVER_CONNECTION)
775 rx_stats.nServerConns--;
777 rx_stats.nClientConns--;
778 MUTEX_EXIT(&rx_stats_mutex);
781 if (conn->specific) {
782 for (i = 0 ; i < conn->nSpecific ; i++) {
783 if (conn->specific[i] && rxi_keyCreate_destructor[i])
784 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
785 conn->specific[i] = NULL;
787 free(conn->specific);
789 conn->specific = NULL;
793 MUTEX_DESTROY(&conn->conn_call_lock);
794 MUTEX_DESTROY(&conn->conn_data_lock);
795 CV_DESTROY(&conn->conn_call_cv);
797 rxi_FreeConnection(conn);
800 /* Destroy the specified connection */
801 void rxi_DestroyConnection(conn)
802 register struct rx_connection *conn;
804 MUTEX_ENTER(&rx_connHashTable_lock);
805 rxi_DestroyConnectionNoLock(conn);
806 /* conn should be at the head of the cleanup list */
807 if (conn == rx_connCleanup_list) {
808 rx_connCleanup_list = rx_connCleanup_list->next;
809 MUTEX_EXIT(&rx_connHashTable_lock);
810 rxi_CleanupConnection(conn);
812 #ifdef RX_ENABLE_LOCKS
814 MUTEX_EXIT(&rx_connHashTable_lock);
816 #endif /* RX_ENABLE_LOCKS */
819 static void rxi_DestroyConnectionNoLock(conn)
820 register struct rx_connection *conn;
822 register struct rx_connection **conn_ptr;
823 register int havecalls = 0;
824 struct rx_packet *packet;
831 MUTEX_ENTER(&conn->conn_data_lock);
832 if (conn->refCount > 0)
835 MUTEX_ENTER(&rx_stats_mutex);
836 rxi_lowConnRefCount++;
837 MUTEX_EXIT(&rx_stats_mutex);
840 if (conn->refCount > 0) {
841 /* Busy; wait till the last guy before proceeding */
842 MUTEX_EXIT(&conn->conn_data_lock);
847 /* If the client previously called rx_NewCall, but it is still
848 * waiting, treat this as a running call, and wait to destroy the
849 * connection later when the call completes. */
850 if ((conn->type == RX_CLIENT_CONNECTION) &&
851 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
852 conn->flags |= RX_CONN_DESTROY_ME;
853 MUTEX_EXIT(&conn->conn_data_lock);
857 MUTEX_EXIT(&conn->conn_data_lock);
859 /* Check for extant references to this connection */
860 for (i = 0; i<RX_MAXCALLS; i++) {
861 register struct rx_call *call = conn->call[i];
864 if (conn->type == RX_CLIENT_CONNECTION) {
865 MUTEX_ENTER(&call->lock);
866 if (call->delayedAckEvent) {
867 /* Push the final acknowledgment out now--there
868 * won't be a subsequent call to acknowledge the
869 * last reply packets */
870 rxevent_Cancel(call->delayedAckEvent, call,
871 RX_CALL_REFCOUNT_DELAY);
872 rxi_AckAll((struct rxevent *)0, call, 0);
874 MUTEX_EXIT(&call->lock);
878 #ifdef RX_ENABLE_LOCKS
880 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
881 MUTEX_EXIT(&conn->conn_data_lock);
884 /* Someone is accessing a packet right now. */
888 #endif /* RX_ENABLE_LOCKS */
891 /* Don't destroy the connection if there are any call
892 * structures still in use */
893 MUTEX_ENTER(&conn->conn_data_lock);
894 conn->flags |= RX_CONN_DESTROY_ME;
895 MUTEX_EXIT(&conn->conn_data_lock);
900 if (conn->delayedAbortEvent) {
901 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
902 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
904 MUTEX_ENTER(&conn->conn_data_lock);
905 rxi_SendConnectionAbort(conn, packet, 0, 1);
906 MUTEX_EXIT(&conn->conn_data_lock);
907 rxi_FreePacket(packet);
911 /* Remove from connection hash table before proceeding */
912 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
913 conn->epoch, conn->type) ];
914 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
915 if (*conn_ptr == conn) {
916 *conn_ptr = conn->next;
920 /* if the conn that we are destroying was the last connection, then we
921 * clear rxLastConn as well */
922 if ( rxLastConn == conn )
925 /* Make sure the connection is completely reset before deleting it. */
926 /* get rid of pending events that could zap us later */
927 if (conn->challengeEvent) {
928 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
931 /* Add the connection to the list of destroyed connections that
932 * need to be cleaned up. This is necessary to avoid deadlocks
933 * in the routines we call to inform others that this connection is
934 * being destroyed. */
935 conn->next = rx_connCleanup_list;
936 rx_connCleanup_list = conn;
939 /* Externally available version */
940 void rx_DestroyConnection(conn)
941 register struct rx_connection *conn;
947 rxi_DestroyConnection (conn);
952 /* Start a new rx remote procedure call, on the specified connection.
953 * If wait is set to 1, wait for a free call channel; otherwise return
954 * 0. Maxtime gives the maximum number of seconds this call may take,
955 * after rx_MakeCall returns. After this time interval, a call to any
956 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
957 * For fine grain locking, we hold the conn_call_lock in order to
958 * to ensure that we don't get signalle after we found a call in an active
959 * state and before we go to sleep.
961 struct rx_call *rx_NewCall(conn)
962 register struct rx_connection *conn;
965 register struct rx_call *call;
966 struct clock queueTime;
970 dpf (("rx_MakeCall(conn %x)\n", conn));
973 clock_GetTime(&queueTime);
975 MUTEX_ENTER(&conn->conn_call_lock);
977 for (i=0; i<RX_MAXCALLS; i++) {
978 call = conn->call[i];
980 MUTEX_ENTER(&call->lock);
981 if (call->state == RX_STATE_DALLY) {
982 rxi_ResetCall(call, 0);
983 (*call->callNumber)++;
986 MUTEX_EXIT(&call->lock);
989 call = rxi_NewCall(conn, i);
990 MUTEX_ENTER(&call->lock);
994 if (i < RX_MAXCALLS) {
997 MUTEX_ENTER(&conn->conn_data_lock);
998 conn->flags |= RX_CONN_MAKECALL_WAITING;
999 MUTEX_EXIT(&conn->conn_data_lock);
1000 #ifdef RX_ENABLE_LOCKS
1001 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1007 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1009 /* Client is initially in send mode */
1010 call->state = RX_STATE_ACTIVE;
1011 call->mode = RX_MODE_SENDING;
1013 /* remember start time for call in case we have hard dead time limit */
1014 call->queueTime = queueTime;
1015 clock_GetTime(&call->startTime);
1016 hzero(call->bytesSent);
1017 hzero(call->bytesRcvd);
1019 /* Turn on busy protocol. */
1020 rxi_KeepAliveOn(call);
1022 MUTEX_EXIT(&call->lock);
1023 MUTEX_EXIT(&conn->conn_call_lock);
1027 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1028 /* Now, if TQ wasn't cleared earlier, do it now. */
1030 MUTEX_ENTER(&call->lock);
1031 while (call->flags & RX_CALL_TQ_BUSY) {
1032 call->flags |= RX_CALL_TQ_WAIT;
1033 #ifdef RX_ENABLE_LOCKS
1034 CV_WAIT(&call->cv_tq, &call->lock);
1035 #else /* RX_ENABLE_LOCKS */
1036 osi_rxSleep(&call->tq);
1037 #endif /* RX_ENABLE_LOCKS */
1039 if (call->flags & RX_CALL_TQ_CLEARME) {
1040 rxi_ClearTransmitQueue(call, 0);
1041 queue_Init(&call->tq);
1043 MUTEX_EXIT(&call->lock);
1045 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1050 rxi_HasActiveCalls(aconn)
1051 register struct rx_connection *aconn; {
1053 register struct rx_call *tcall;
1057 for(i=0; i<RX_MAXCALLS; i++) {
1058 if (tcall = aconn->call[i]) {
1059 if ((tcall->state == RX_STATE_ACTIVE)
1060 || (tcall->state == RX_STATE_PRECALL)) {
1070 rxi_GetCallNumberVector(aconn, aint32s)
1071 register struct rx_connection *aconn;
1072 register afs_int32 *aint32s; {
1074 register struct rx_call *tcall;
1078 for(i=0; i<RX_MAXCALLS; i++) {
1079 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1080 aint32s[i] = aconn->callNumber[i]+1;
1082 aint32s[i] = aconn->callNumber[i];
1088 rxi_SetCallNumberVector(aconn, aint32s)
1089 register struct rx_connection *aconn;
1090 register afs_int32 *aint32s; {
1092 register struct rx_call *tcall;
1096 for(i=0; i<RX_MAXCALLS; i++) {
1097 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1098 aconn->callNumber[i] = aint32s[i] - 1;
1100 aconn->callNumber[i] = aint32s[i];
1106 /* Advertise a new service. A service is named locally by a UDP port
1107 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1110 rx_NewService(port, serviceId, serviceName, securityObjects,
1111 nSecurityObjects, serviceProc)
1114 char *serviceName; /* Name for identification purposes (e.g. the
1115 * service name might be used for probing for
1117 struct rx_securityClass **securityObjects;
1118 int nSecurityObjects;
1119 afs_int32 (*serviceProc)();
1121 osi_socket socket = OSI_NULLSOCKET;
1122 register struct rx_service *tservice;
1128 if (serviceId == 0) {
1129 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1135 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1142 tservice = rxi_AllocService();
1145 for (i = 0; i<RX_MAX_SERVICES; i++) {
1146 register struct rx_service *service = rx_services[i];
1148 if (port == service->servicePort) {
1149 if (service->serviceId == serviceId) {
1150 /* The identical service has already been
1151 * installed; if the caller was intending to
1152 * change the security classes used by this
1153 * service, he/she loses. */
1154 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1157 rxi_FreeService(tservice);
1160 /* Different service, same port: re-use the socket
1161 * which is bound to the same port */
1162 socket = service->socket;
1165 if (socket == OSI_NULLSOCKET) {
1166 /* If we don't already have a socket (from another
1167 * service on same port) get a new one */
1168 socket = rxi_GetUDPSocket(port);
1169 if (socket == OSI_NULLSOCKET) {
1172 rxi_FreeService(tservice);
1177 service->socket = socket;
1178 service->servicePort = port;
1179 service->serviceId = serviceId;
1180 service->serviceName = serviceName;
1181 service->nSecurityObjects = nSecurityObjects;
1182 service->securityObjects = securityObjects;
1183 service->minProcs = 0;
1184 service->maxProcs = 1;
1185 service->idleDeadTime = 60;
1186 service->connDeadTime = rx_connDeadTime;
1187 service->executeRequestProc = serviceProc;
1188 rx_services[i] = service; /* not visible until now */
1196 rxi_FreeService(tservice);
1197 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1201 /* Generic request processing loop. This routine should be called
1202 * by the implementation dependent rx_ServerProc. If socketp is
1203 * non-null, it will be set to the file descriptor that this thread
1204 * is now listening on. If socketp is null, this routine will never
1206 void rxi_ServerProc(threadID, newcall, socketp)
1208 struct rx_call *newcall;
1209 osi_socket *socketp;
1211 register struct rx_call *call;
1212 register afs_int32 code;
1213 register struct rx_service *tservice = NULL;
1220 call = rx_GetCall(threadID, tservice, socketp);
1221 if (socketp && *socketp != OSI_NULLSOCKET) {
1222 /* We are now a listener thread */
1227 /* if server is restarting( typically smooth shutdown) then do not
1228 * allow any new calls.
1231 if ( rx_tranquil && (call != NULL) ) {
1236 MUTEX_ENTER(&call->lock);
1238 rxi_CallError(call, RX_RESTARTING);
1239 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1241 MUTEX_EXIT(&call->lock);
1247 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1248 #ifdef RX_ENABLE_LOCKS
1250 #endif /* RX_ENABLE_LOCKS */
1251 afs_termState = AFSOP_STOP_AFS;
1252 afs_osi_Wakeup(&afs_termState);
1253 #ifdef RX_ENABLE_LOCKS
1255 #endif /* RX_ENABLE_LOCKS */
1260 tservice = call->conn->service;
1262 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1264 code = call->conn->service->executeRequestProc(call);
1266 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1268 rx_EndCall(call, code);
1269 MUTEX_ENTER(&rx_stats_mutex);
1271 MUTEX_EXIT(&rx_stats_mutex);
1276 void rx_WakeupServerProcs()
1278 struct rx_serverQueueEntry *np, *tqp;
1283 MUTEX_ENTER(&rx_serverPool_lock);
1285 #ifdef RX_ENABLE_LOCKS
1286 if (rx_waitForPacket)
1287 CV_BROADCAST(&rx_waitForPacket->cv);
1288 #else /* RX_ENABLE_LOCKS */
1289 if (rx_waitForPacket)
1290 osi_rxWakeup(rx_waitForPacket);
1291 #endif /* RX_ENABLE_LOCKS */
1292 MUTEX_ENTER(&freeSQEList_lock);
1293 for (np = rx_FreeSQEList; np; np = tqp) {
1294 tqp = *(struct rx_serverQueueEntry **)np;
1295 #ifdef RX_ENABLE_LOCKS
1296 CV_BROADCAST(&np->cv);
1297 #else /* RX_ENABLE_LOCKS */
1299 #endif /* RX_ENABLE_LOCKS */
1301 MUTEX_EXIT(&freeSQEList_lock);
1302 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1303 #ifdef RX_ENABLE_LOCKS
1304 CV_BROADCAST(&np->cv);
1305 #else /* RX_ENABLE_LOCKS */
1307 #endif /* RX_ENABLE_LOCKS */
1309 MUTEX_EXIT(&rx_serverPool_lock);
1315 * One thing that seems to happen is that all the server threads get
1316 * tied up on some empty or slow call, and then a whole bunch of calls
1317 * arrive at once, using up the packet pool, so now there are more
1318 * empty calls. The most critical resources here are server threads
1319 * and the free packet pool. The "doreclaim" code seems to help in
1320 * general. I think that eventually we arrive in this state: there
1321 * are lots of pending calls which do have all their packets present,
1322 * so they won't be reclaimed, are multi-packet calls, so they won't
1323 * be scheduled until later, and thus are tying up most of the free
1324 * packet pool for a very long time.
1326 * 1. schedule multi-packet calls if all the packets are present.
1327 * Probably CPU-bound operation, useful to return packets to pool.
1328 * Do what if there is a full window, but the last packet isn't here?
1329 * 3. preserve one thread which *only* runs "best" calls, otherwise
1330 * it sleeps and waits for that type of call.
1331 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1332 * the current dataquota business is badly broken. The quota isn't adjusted
1333 * to reflect how many packets are presently queued for a running call.
1334 * So, when we schedule a queued call with a full window of packets queued
1335 * up for it, that *should* free up a window full of packets for other 2d-class
1336 * calls to be able to use from the packet pool. But it doesn't.
1338 * NB. Most of the time, this code doesn't run -- since idle server threads
1339 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1340 * as a new call arrives.
1342 /* Sleep until a call arrives. Returns a pointer to the call, ready
1343 * for an rx_Read. */
1344 #ifdef RX_ENABLE_LOCKS
1346 rx_GetCall(tno, cur_service, socketp)
1348 struct rx_service *cur_service;
1349 osi_socket *socketp;
1351 struct rx_serverQueueEntry *sq;
1352 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1353 struct rx_service *service;
1356 MUTEX_ENTER(&freeSQEList_lock);
1358 if (sq = rx_FreeSQEList) {
1359 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1360 MUTEX_EXIT(&freeSQEList_lock);
1361 } else { /* otherwise allocate a new one and return that */
1362 MUTEX_EXIT(&freeSQEList_lock);
1363 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1364 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1365 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1368 MUTEX_ENTER(&rx_serverPool_lock);
1369 if (cur_service != NULL) {
1370 ReturnToServerPool(cur_service);
1373 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1374 register struct rx_call *tcall, *ncall;
1375 choice2 = (struct rx_call *) 0;
1376 /* Scan for eligible incoming calls. A call is not eligible
1377 * if the maximum number of calls for its service type are
1378 * already executing */
1379 /* One thread will process calls FCFS (to prevent starvation),
1380 * while the other threads may run ahead looking for calls which
1381 * have all their input data available immediately. This helps
1382 * keep threads from blocking, waiting for data from the client. */
1383 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1384 service = tcall->conn->service;
1385 if (!QuotaOK(service)) {
1388 if (!tno || !tcall->queue_item_header.next ) {
1389 /* If we're thread 0, then we'll just use
1390 * this call. If we haven't been able to find an optimal
1391 * choice, and we're at the end of the list, then use a
1392 * 2d choice if one has been identified. Otherwise... */
1393 call = (choice2 ? choice2 : tcall);
1394 service = call->conn->service;
1395 } else if (!queue_IsEmpty(&tcall->rq)) {
1396 struct rx_packet *rp;
1397 rp = queue_First(&tcall->rq, rx_packet);
1398 if (rp->header.seq == 1) {
1399 if (!meltdown_1pkt ||
1400 (rp->header.flags & RX_LAST_PACKET)) {
1402 } else if (rxi_2dchoice && !choice2 &&
1403 !(tcall->flags & RX_CALL_CLEARED) &&
1404 (tcall->rprev > rxi_HardAckRate)) {
1406 } else rxi_md2cnt++;
1412 ReturnToServerPool(service);
1419 rxi_ServerThreadSelectingCall = 1;
1420 MUTEX_EXIT(&rx_serverPool_lock);
1421 MUTEX_ENTER(&call->lock);
1422 MUTEX_ENTER(&rx_serverPool_lock);
1424 if (queue_IsEmpty(&call->rq) ||
1425 queue_First(&call->rq, rx_packet)->header.seq != 1)
1426 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1428 CLEAR_CALL_QUEUE_LOCK(call);
1430 MUTEX_EXIT(&call->lock);
1431 ReturnToServerPool(service);
1432 rxi_ServerThreadSelectingCall = 0;
1433 CV_SIGNAL(&rx_serverPool_cv);
1434 call = (struct rx_call*)0;
1437 call->flags &= (~RX_CALL_WAIT_PROC);
1438 MUTEX_ENTER(&rx_stats_mutex);
1440 MUTEX_EXIT(&rx_stats_mutex);
1441 rxi_ServerThreadSelectingCall = 0;
1442 CV_SIGNAL(&rx_serverPool_cv);
1443 MUTEX_EXIT(&rx_serverPool_lock);
1447 /* If there are no eligible incoming calls, add this process
1448 * to the idle server queue, to wait for one */
1452 *socketp = OSI_NULLSOCKET;
1454 sq->socketp = socketp;
1455 queue_Append(&rx_idleServerQueue, sq);
1456 #ifndef AFS_AIX41_ENV
1457 rx_waitForPacket = sq;
1458 #endif /* AFS_AIX41_ENV */
1460 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1462 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1463 MUTEX_EXIT(&rx_serverPool_lock);
1464 return (struct rx_call *)0;
1467 } while (!(call = sq->newcall) &&
1468 !(socketp && *socketp != OSI_NULLSOCKET));
1469 MUTEX_EXIT(&rx_serverPool_lock);
1471 MUTEX_ENTER(&call->lock);
1477 MUTEX_ENTER(&freeSQEList_lock);
1478 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1479 rx_FreeSQEList = sq;
1480 MUTEX_EXIT(&freeSQEList_lock);
1483 clock_GetTime(&call->startTime);
1484 call->state = RX_STATE_ACTIVE;
1485 call->mode = RX_MODE_RECEIVING;
1487 rxi_calltrace(RX_CALL_START, call);
1488 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1489 call->conn->service->servicePort,
1490 call->conn->service->serviceId, call));
1492 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1493 MUTEX_EXIT(&call->lock);
1495 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1500 #else /* RX_ENABLE_LOCKS */
1502 rx_GetCall(tno, cur_service, socketp)
1504 struct rx_service *cur_service;
1505 osi_socket *socketp;
1507 struct rx_serverQueueEntry *sq;
1508 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1509 struct rx_service *service;
1514 MUTEX_ENTER(&freeSQEList_lock);
1516 if (sq = rx_FreeSQEList) {
1517 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1518 MUTEX_EXIT(&freeSQEList_lock);
1519 } else { /* otherwise allocate a new one and return that */
1520 MUTEX_EXIT(&freeSQEList_lock);
1521 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1522 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1523 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1525 MUTEX_ENTER(&sq->lock);
1527 if (cur_service != NULL) {
1528 cur_service->nRequestsRunning--;
1529 if (cur_service->nRequestsRunning < cur_service->minProcs)
1533 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1534 register struct rx_call *tcall, *ncall;
1535 /* Scan for eligible incoming calls. A call is not eligible
1536 * if the maximum number of calls for its service type are
1537 * already executing */
1538 /* One thread will process calls FCFS (to prevent starvation),
1539 * while the other threads may run ahead looking for calls which
1540 * have all their input data available immediately. This helps
1541 * keep threads from blocking, waiting for data from the client. */
1542 choice2 = (struct rx_call *) 0;
1543 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1544 service = tcall->conn->service;
1545 if (QuotaOK(service)) {
1546 if (!tno || !tcall->queue_item_header.next ) {
1547 /* If we're thread 0, then we'll just use
1548 * this call. If we haven't been able to find an optimal
1549 * choice, and we're at the end of the list, then use a
1550 * 2d choice if one has been identified. Otherwise... */
1551 call = (choice2 ? choice2 : tcall);
1552 service = call->conn->service;
1553 } else if (!queue_IsEmpty(&tcall->rq)) {
1554 struct rx_packet *rp;
1555 rp = queue_First(&tcall->rq, rx_packet);
1556 if (rp->header.seq == 1
1557 && (!meltdown_1pkt ||
1558 (rp->header.flags & RX_LAST_PACKET))) {
1560 } else if (rxi_2dchoice && !choice2 &&
1561 !(tcall->flags & RX_CALL_CLEARED) &&
1562 (tcall->rprev > rxi_HardAckRate)) {
1564 } else rxi_md2cnt++;
1574 /* we can't schedule a call if there's no data!!! */
1575 /* send an ack if there's no data, if we're missing the
1576 * first packet, or we're missing something between first
1577 * and last -- there's a "hole" in the incoming data. */
1578 if (queue_IsEmpty(&call->rq) ||
1579 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1580 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1581 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1583 call->flags &= (~RX_CALL_WAIT_PROC);
1584 service->nRequestsRunning++;
1585 /* just started call in minProcs pool, need fewer to maintain
1587 if (service->nRequestsRunning <= service->minProcs)
1591 /* MUTEX_EXIT(&call->lock); */
1594 /* If there are no eligible incoming calls, add this process
1595 * to the idle server queue, to wait for one */
1598 *socketp = OSI_NULLSOCKET;
1600 sq->socketp = socketp;
1601 queue_Append(&rx_idleServerQueue, sq);
1605 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1608 return (struct rx_call *)0;
1611 } while (!(call = sq->newcall) &&
1612 !(socketp && *socketp != OSI_NULLSOCKET));
1614 MUTEX_EXIT(&sq->lock);
1616 MUTEX_ENTER(&freeSQEList_lock);
1617 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1618 rx_FreeSQEList = sq;
1619 MUTEX_EXIT(&freeSQEList_lock);
1622 clock_GetTime(&call->startTime);
1623 call->state = RX_STATE_ACTIVE;
1624 call->mode = RX_MODE_RECEIVING;
1626 rxi_calltrace(RX_CALL_START, call);
1627 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1628 call->conn->service->servicePort,
1629 call->conn->service->serviceId, call));
1631 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1639 #endif /* RX_ENABLE_LOCKS */
1643 /* Establish a procedure to be called when a packet arrives for a
1644 * call. This routine will be called at most once after each call,
1645 * and will also be called if there is an error condition on the or
1646 * the call is complete. Used by multi rx to build a selection
1647 * function which determines which of several calls is likely to be a
1648 * good one to read from.
1649 * NOTE: the way this is currently implemented it is probably only a
1650 * good idea to (1) use it immediately after a newcall (clients only)
1651 * and (2) only use it once. Other uses currently void your warranty
1653 void rx_SetArrivalProc(call, proc, handle, arg)
1654 register struct rx_call *call;
1655 register VOID (*proc)();
1656 register VOID *handle;
1659 call->arrivalProc = proc;
1660 call->arrivalProcHandle = handle;
1661 call->arrivalProcArg = arg;
1664 /* Call is finished (possibly prematurely). Return rc to the peer, if
1665 * appropriate, and return the final error code from the conversation
1668 afs_int32 rx_EndCall(call, rc)
1669 register struct rx_call *call;
1672 register struct rx_connection *conn = call->conn;
1673 register struct rx_service *service;
1674 register struct rx_packet *tp; /* Temporary packet pointer */
1675 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1679 dpf(("rx_EndCall(call %x)\n", call));
1683 MUTEX_ENTER(&call->lock);
1685 if (rc == 0 && call->error == 0) {
1686 call->abortCode = 0;
1687 call->abortCount = 0;
1690 call->arrivalProc = (VOID (*)()) 0;
1691 if (rc && call->error == 0) {
1692 rxi_CallError(call, rc);
1693 /* Send an abort message to the peer if this error code has
1694 * only just been set. If it was set previously, assume the
1695 * peer has already been sent the error code or will request it
1697 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1699 if (conn->type == RX_SERVER_CONNECTION) {
1700 /* Make sure reply or at least dummy reply is sent */
1701 if (call->mode == RX_MODE_RECEIVING) {
1702 rxi_WriteProc(call, 0, 0);
1704 if (call->mode == RX_MODE_SENDING) {
1705 rxi_FlushWrite(call);
1707 service = conn->service;
1708 rxi_calltrace(RX_CALL_END, call);
1709 /* Call goes to hold state until reply packets are acknowledged */
1710 if (call->tfirst + call->nSoftAcked < call->tnext) {
1711 call->state = RX_STATE_HOLD;
1713 call->state = RX_STATE_DALLY;
1714 rxi_ClearTransmitQueue(call, 0);
1715 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1716 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1719 else { /* Client connection */
1721 /* Make sure server receives input packets, in the case where
1722 * no reply arguments are expected */
1723 if ((call->mode == RX_MODE_SENDING)
1724 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1725 (void) rxi_ReadProc(call, &dummy, 1);
1727 /* We need to release the call lock since it's lower than the
1728 * conn_call_lock and we don't want to hold the conn_call_lock
1729 * over the rx_ReadProc call. The conn_call_lock needs to be held
1730 * here for the case where rx_NewCall is perusing the calls on
1731 * the connection structure. We don't want to signal until
1732 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1733 * have checked this call, found it active and by the time it
1734 * goes to sleep, will have missed the signal.
1736 MUTEX_EXIT(&call->lock);
1737 MUTEX_ENTER(&conn->conn_call_lock);
1738 MUTEX_ENTER(&call->lock);
1739 MUTEX_ENTER(&conn->conn_data_lock);
1740 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1741 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1742 MUTEX_EXIT(&conn->conn_data_lock);
1743 #ifdef RX_ENABLE_LOCKS
1744 CV_BROADCAST(&conn->conn_call_cv);
1749 #ifdef RX_ENABLE_LOCKS
1751 MUTEX_EXIT(&conn->conn_data_lock);
1753 #endif /* RX_ENABLE_LOCKS */
1754 call->state = RX_STATE_DALLY;
1756 error = call->error;
1758 /* currentPacket, nLeft, and NFree must be zeroed here, because
1759 * ResetCall cannot: ResetCall may be called at splnet(), in the
1760 * kernel version, and may interrupt the macros rx_Read or
1761 * rx_Write, which run at normal priority for efficiency. */
1762 if (call->currentPacket) {
1763 rxi_FreePacket(call->currentPacket);
1764 call->currentPacket = (struct rx_packet *) 0;
1765 call->nLeft = call->nFree = call->curlen = 0;
1768 call->nLeft = call->nFree = call->curlen = 0;
1770 /* Free any packets from the last call to ReadvProc/WritevProc */
1771 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1776 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1777 MUTEX_EXIT(&call->lock);
1778 if (conn->type == RX_CLIENT_CONNECTION)
1779 MUTEX_EXIT(&conn->conn_call_lock);
1783 * Map errors to the local host's errno.h format.
1785 error = ntoh_syserr_conv(error);
1789 #if !defined(KERNEL)
1791 /* Call this routine when shutting down a server or client (especially
1792 * clients). This will allow Rx to gracefully garbage collect server
1793 * connections, and reduce the number of retries that a server might
1794 * make to a dead client.
1795 * This is not quite right, since some calls may still be ongoing and
1796 * we can't lock them to destroy them. */
1797 void rx_Finalize() {
1798 register struct rx_connection **conn_ptr, **conn_end;
1802 if (rxinit_status == 1) {
1804 return; /* Already shutdown. */
1806 rxi_DeleteCachedConnections();
1807 if (rx_connHashTable) {
1808 MUTEX_ENTER(&rx_connHashTable_lock);
1809 for (conn_ptr = &rx_connHashTable[0],
1810 conn_end = &rx_connHashTable[rx_hashTableSize];
1811 conn_ptr < conn_end; conn_ptr++) {
1812 struct rx_connection *conn, *next;
1813 for (conn = *conn_ptr; conn; conn = next) {
1815 if (conn->type == RX_CLIENT_CONNECTION) {
1816 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1818 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1819 #ifdef RX_ENABLE_LOCKS
1820 rxi_DestroyConnectionNoLock(conn);
1821 #else /* RX_ENABLE_LOCKS */
1822 rxi_DestroyConnection(conn);
1823 #endif /* RX_ENABLE_LOCKS */
1827 #ifdef RX_ENABLE_LOCKS
1828 while (rx_connCleanup_list) {
1829 struct rx_connection *conn;
1830 conn = rx_connCleanup_list;
1831 rx_connCleanup_list = rx_connCleanup_list->next;
1832 MUTEX_EXIT(&rx_connHashTable_lock);
1833 rxi_CleanupConnection(conn);
1834 MUTEX_ENTER(&rx_connHashTable_lock);
1836 MUTEX_EXIT(&rx_connHashTable_lock);
1837 #endif /* RX_ENABLE_LOCKS */
1846 /* if we wakeup packet waiter too often, can get in loop with two
1847 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1849 rxi_PacketsUnWait() {
1851 if (!rx_waitingForPackets) {
1855 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1856 return; /* still over quota */
1859 rx_waitingForPackets = 0;
1860 #ifdef RX_ENABLE_LOCKS
1861 CV_BROADCAST(&rx_waitingForPackets_cv);
1863 osi_rxWakeup(&rx_waitingForPackets);
1869 /* ------------------Internal interfaces------------------------- */
1871 /* Return this process's service structure for the
1872 * specified socket and service */
1873 struct rx_service *rxi_FindService(socket, serviceId)
1874 register osi_socket socket;
1875 register u_short serviceId;
1877 register struct rx_service **sp;
1878 for (sp = &rx_services[0]; *sp; sp++) {
1879 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1885 /* Allocate a call structure, for the indicated channel of the
1886 * supplied connection. The mode and state of the call must be set by
1888 struct rx_call *rxi_NewCall(conn, channel)
1889 register struct rx_connection *conn;
1890 register int channel;
1892 register struct rx_call *call;
1893 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1894 register struct rx_call *cp; /* Call pointer temp */
1895 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1896 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1898 /* Grab an existing call structure, or allocate a new one.
1899 * Existing call structures are assumed to have been left reset by
1901 MUTEX_ENTER(&rx_freeCallQueue_lock);
1903 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1905 * EXCEPT that the TQ might not yet be cleared out.
1906 * Skip over those with in-use TQs.
1909 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1910 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1916 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1917 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1918 call = queue_First(&rx_freeCallQueue, rx_call);
1919 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1921 MUTEX_ENTER(&rx_stats_mutex);
1922 rx_stats.nFreeCallStructs--;
1923 MUTEX_EXIT(&rx_stats_mutex);
1924 MUTEX_EXIT(&rx_freeCallQueue_lock);
1925 MUTEX_ENTER(&call->lock);
1926 CLEAR_CALL_QUEUE_LOCK(call);
1927 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1928 /* Now, if TQ wasn't cleared earlier, do it now. */
1929 if (call->flags & RX_CALL_TQ_CLEARME) {
1930 rxi_ClearTransmitQueue(call, 0);
1931 queue_Init(&call->tq);
1933 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1934 /* Bind the call to its connection structure */
1936 rxi_ResetCall(call, 1);
1939 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1941 MUTEX_EXIT(&rx_freeCallQueue_lock);
1942 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1943 MUTEX_ENTER(&call->lock);
1944 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1945 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1946 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1948 MUTEX_ENTER(&rx_stats_mutex);
1949 rx_stats.nCallStructs++;
1950 MUTEX_EXIT(&rx_stats_mutex);
1951 /* Initialize once-only items */
1952 queue_Init(&call->tq);
1953 queue_Init(&call->rq);
1954 queue_Init(&call->iovq);
1955 /* Bind the call to its connection structure (prereq for reset) */
1957 rxi_ResetCall(call, 1);
1959 call->channel = channel;
1960 call->callNumber = &conn->callNumber[channel];
1961 /* Note that the next expected call number is retained (in
1962 * conn->callNumber[i]), even if we reallocate the call structure
1964 conn->call[channel] = call;
1965 /* if the channel's never been used (== 0), we should start at 1, otherwise
1966 the call number is valid from the last time this channel was used */
1967 if (*call->callNumber == 0) *call->callNumber = 1;
1969 MUTEX_EXIT(&call->lock);
1973 /* A call has been inactive long enough that so we can throw away
1974 * state, including the call structure, which is placed on the call
1976 * Call is locked upon entry.
1978 #ifdef RX_ENABLE_LOCKS
1979 void rxi_FreeCall(call, haveCTLock)
1980 int haveCTLock; /* Set if called from rxi_ReapConnections */
1981 #else /* RX_ENABLE_LOCKS */
1982 void rxi_FreeCall(call)
1983 #endif /* RX_ENABLE_LOCKS */
1984 register struct rx_call *call;
1986 register int channel = call->channel;
1987 register struct rx_connection *conn = call->conn;
1990 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
1991 (*call->callNumber)++;
1992 rxi_ResetCall(call, 0);
1993 call->conn->call[channel] = (struct rx_call *) 0;
1995 MUTEX_ENTER(&rx_freeCallQueue_lock);
1996 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
1997 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1998 /* A call may be free even though its transmit queue is still in use.
1999 * Since we search the call list from head to tail, put busy calls at
2000 * the head of the list, and idle calls at the tail.
2002 if (call->flags & RX_CALL_TQ_BUSY)
2003 queue_Prepend(&rx_freeCallQueue, call);
2005 queue_Append(&rx_freeCallQueue, call);
2006 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2007 queue_Append(&rx_freeCallQueue, call);
2008 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2009 MUTEX_ENTER(&rx_stats_mutex);
2010 rx_stats.nFreeCallStructs++;
2011 MUTEX_EXIT(&rx_stats_mutex);
2013 MUTEX_EXIT(&rx_freeCallQueue_lock);
2015 /* Destroy the connection if it was previously slated for
2016 * destruction, i.e. the Rx client code previously called
2017 * rx_DestroyConnection (client connections), or
2018 * rxi_ReapConnections called the same routine (server
2019 * connections). Only do this, however, if there are no
2020 * outstanding calls. Note that for fine grain locking, there appears
2021 * to be a deadlock in that rxi_FreeCall has a call locked and
2022 * DestroyConnectionNoLock locks each call in the conn. But note a
2023 * few lines up where we have removed this call from the conn.
2024 * If someone else destroys a connection, they either have no
2025 * call lock held or are going through this section of code.
2027 if (conn->flags & RX_CONN_DESTROY_ME) {
2028 MUTEX_ENTER(&conn->conn_data_lock);
2030 MUTEX_EXIT(&conn->conn_data_lock);
2031 #ifdef RX_ENABLE_LOCKS
2033 rxi_DestroyConnectionNoLock(conn);
2035 rxi_DestroyConnection(conn);
2036 #else /* RX_ENABLE_LOCKS */
2037 rxi_DestroyConnection(conn);
2038 #endif /* RX_ENABLE_LOCKS */
2042 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2043 char *rxi_Alloc(size)
2044 register size_t size;
2048 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2049 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2052 int glockOwner = ISAFS_GLOCK();
2056 MUTEX_ENTER(&rx_stats_mutex);
2057 rxi_Alloccnt++; rxi_Allocsize += size;
2058 MUTEX_EXIT(&rx_stats_mutex);
2059 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2060 if (size > AFS_SMALLOCSIZ) {
2061 p = (char *) osi_AllocMediumSpace(size);
2063 p = (char *) osi_AllocSmall(size, 1);
2064 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2069 p = (char *) osi_Alloc(size);
2071 if (!p) osi_Panic("rxi_Alloc error");
2076 void rxi_Free(addr, size)
2078 register size_t size;
2080 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2081 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2084 int glockOwner = ISAFS_GLOCK();
2088 MUTEX_ENTER(&rx_stats_mutex);
2089 rxi_Alloccnt--; rxi_Allocsize -= size;
2090 MUTEX_EXIT(&rx_stats_mutex);
2091 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2092 if (size > AFS_SMALLOCSIZ)
2093 osi_FreeMediumSpace(addr);
2095 osi_FreeSmall(addr);
2096 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2101 osi_Free(addr, size);
2105 /* Find the peer process represented by the supplied (host,port)
2106 * combination. If there is no appropriate active peer structure, a
2107 * new one will be allocated and initialized
2108 * The origPeer, if set, is a pointer to a peer structure on which the
2109 * refcount will be be decremented. This is used to replace the peer
2110 * structure hanging off a connection structure */
2111 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2112 register afs_uint32 host;
2113 register u_short port;
2114 struct rx_peer *origPeer;
2117 register struct rx_peer *pp;
2119 hashIndex = PEER_HASH(host, port);
2120 MUTEX_ENTER(&rx_peerHashTable_lock);
2121 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2122 if ((pp->host == host) && (pp->port == port)) break;
2126 pp = rxi_AllocPeer(); /* This bzero's *pp */
2127 pp->host = host; /* set here or in InitPeerParams is zero */
2129 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2130 queue_Init(&pp->congestionQueue);
2131 queue_Init(&pp->rpcStats);
2132 pp->next = rx_peerHashTable[hashIndex];
2133 rx_peerHashTable[hashIndex] = pp;
2134 rxi_InitPeerParams(pp);
2135 MUTEX_ENTER(&rx_stats_mutex);
2136 rx_stats.nPeerStructs++;
2137 MUTEX_EXIT(&rx_stats_mutex);
2144 origPeer->refCount--;
2145 MUTEX_EXIT(&rx_peerHashTable_lock);
2150 /* Find the connection at (host, port) started at epoch, and with the
2151 * given connection id. Creates the server connection if necessary.
2152 * The type specifies whether a client connection or a server
2153 * connection is desired. In both cases, (host, port) specify the
2154 * peer's (host, pair) pair. Client connections are not made
2155 * automatically by this routine. The parameter socket gives the
2156 * socket descriptor on which the packet was received. This is used,
2157 * in the case of server connections, to check that *new* connections
2158 * come via a valid (port, serviceId). Finally, the securityIndex
2159 * parameter must match the existing index for the connection. If a
2160 * server connection is created, it will be created using the supplied
2161 * index, if the index is valid for this service */
2162 struct rx_connection *
2163 rxi_FindConnection(socket, host, port, serviceId, cid,
2164 epoch, type, securityIndex)
2166 register afs_int32 host;
2167 register u_short port;
2172 u_int securityIndex;
2174 int hashindex, flag;
2175 register struct rx_connection *conn;
2176 struct rx_peer *peer;
2177 hashindex = CONN_HASH(host, port, cid, epoch, type);
2178 MUTEX_ENTER(&rx_connHashTable_lock);
2179 rxLastConn ? (conn = rxLastConn, flag = 0) :
2180 (conn = rx_connHashTable[hashindex], flag = 1);
2182 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2183 && (epoch == conn->epoch)) {
2184 register struct rx_peer *pp = conn->peer;
2185 if (securityIndex != conn->securityIndex) {
2186 /* this isn't supposed to happen, but someone could forge a packet
2187 like this, and there seems to be some CM bug that makes this
2188 happen from time to time -- in which case, the fileserver
2190 MUTEX_EXIT(&rx_connHashTable_lock);
2191 return (struct rx_connection *) 0;
2193 /* epoch's high order bits mean route for security reasons only on
2194 * the cid, not the host and port fields.
2196 if (conn->epoch & 0x80000000) break;
2197 if (((type == RX_CLIENT_CONNECTION)
2198 || (pp->host == host)) && (pp->port == port))
2203 /* the connection rxLastConn that was used the last time is not the
2204 ** one we are looking for now. Hence, start searching in the hash */
2206 conn = rx_connHashTable[hashindex];
2212 struct rx_service *service;
2213 if (type == RX_CLIENT_CONNECTION) {
2214 MUTEX_EXIT(&rx_connHashTable_lock);
2215 return (struct rx_connection *) 0;
2217 service = rxi_FindService(socket, serviceId);
2218 if (!service || (securityIndex >= service->nSecurityObjects)
2219 || (service->securityObjects[securityIndex] == 0)) {
2220 MUTEX_EXIT(&rx_connHashTable_lock);
2221 return (struct rx_connection *) 0;
2223 conn = rxi_AllocConnection(); /* This bzero's the connection */
2224 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2226 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2228 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2229 conn->next = rx_connHashTable[hashindex];
2230 rx_connHashTable[hashindex] = conn;
2231 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2232 conn->type = RX_SERVER_CONNECTION;
2233 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2234 conn->epoch = epoch;
2235 conn->cid = cid & RX_CIDMASK;
2236 /* conn->serial = conn->lastSerial = 0; */
2237 /* conn->timeout = 0; */
2238 conn->ackRate = RX_FAST_ACK_RATE;
2239 conn->service = service;
2240 conn->serviceId = serviceId;
2241 conn->securityIndex = securityIndex;
2242 conn->securityObject = service->securityObjects[securityIndex];
2243 conn->nSpecific = 0;
2244 conn->specific = NULL;
2245 rx_SetConnDeadTime(conn, service->connDeadTime);
2246 /* Notify security object of the new connection */
2247 RXS_NewConnection(conn->securityObject, conn);
2248 /* XXXX Connection timeout? */
2249 if (service->newConnProc) (*service->newConnProc)(conn);
2250 MUTEX_ENTER(&rx_stats_mutex);
2251 rx_stats.nServerConns++;
2252 MUTEX_EXIT(&rx_stats_mutex);
2256 /* Ensure that the peer structure is set up in such a way that
2257 ** replies in this connection go back to that remote interface
2258 ** from which the last packet was sent out. In case, this packet's
2259 ** source IP address does not match the peer struct for this conn,
2260 ** then drop the refCount on conn->peer and get a new peer structure.
2261 ** We can check the host,port field in the peer structure without the
2262 ** rx_peerHashTable_lock because the peer structure has its refCount
2263 ** incremented and the only time the host,port in the peer struct gets
2264 ** updated is when the peer structure is created.
2266 if (conn->peer->host == host )
2267 peer = conn->peer; /* no change to the peer structure */
2269 peer = rxi_FindPeer(host, port, conn->peer, 1);
2272 MUTEX_ENTER(&conn->conn_data_lock);
2275 MUTEX_EXIT(&conn->conn_data_lock);
2277 rxLastConn = conn; /* store this connection as the last conn used */
2278 MUTEX_EXIT(&rx_connHashTable_lock);
2282 /* There are two packet tracing routines available for testing and monitoring
2283 * Rx. One is called just after every packet is received and the other is
2284 * called just before every packet is sent. Received packets, have had their
2285 * headers decoded, and packets to be sent have not yet had their headers
2286 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2287 * containing the network address. Both can be modified. The return value, if
2288 * non-zero, indicates that the packet should be dropped. */
2290 int (*rx_justReceived)() = 0;
2291 int (*rx_almostSent)() = 0;
2293 /* A packet has been received off the interface. Np is the packet, socket is
2294 * the socket number it was received from (useful in determining which service
2295 * this packet corresponds to), and (host, port) reflect the host,port of the
2296 * sender. This call returns the packet to the caller if it is finished with
2297 * it, rather than de-allocating it, just as a small performance hack */
2299 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2300 register struct rx_packet *np;
2305 struct rx_call **newcallp;
2307 register struct rx_call *call;
2308 register struct rx_connection *conn;
2310 afs_uint32 currentCallNumber;
2316 struct rx_packet *tnp;
2319 /* We don't print out the packet until now because (1) the time may not be
2320 * accurate enough until now in the lwp implementation (rx_Listener only gets
2321 * the time after the packet is read) and (2) from a protocol point of view,
2322 * this is the first time the packet has been seen */
2323 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2324 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2325 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2326 np->header.serial, packetType, host, port, np->header.serviceId,
2327 np->header.epoch, np->header.cid, np->header.callNumber,
2328 np->header.seq, np->header.flags, np));
2331 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2332 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2335 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2336 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2339 /* If an input tracer function is defined, call it with the packet and
2340 * network address. Note this function may modify its arguments. */
2341 if (rx_justReceived) {
2342 struct sockaddr_in addr;
2344 addr.sin_family = AF_INET;
2345 addr.sin_port = port;
2346 addr.sin_addr.s_addr = host;
2347 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2348 addr.sin_len = sizeof(addr);
2349 #endif /* AFS_OSF_ENV */
2350 drop = (*rx_justReceived) (np, &addr);
2351 /* drop packet if return value is non-zero */
2352 if (drop) return np;
2353 port = addr.sin_port; /* in case fcn changed addr */
2354 host = addr.sin_addr.s_addr;
2358 /* If packet was not sent by the client, then *we* must be the client */
2359 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2360 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2362 /* Find the connection (or fabricate one, if we're the server & if
2363 * necessary) associated with this packet */
2364 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2365 np->header.cid, np->header.epoch, type,
2366 np->header.securityIndex);
2369 /* If no connection found or fabricated, just ignore the packet.
2370 * (An argument could be made for sending an abort packet for
2375 MUTEX_ENTER(&conn->conn_data_lock);
2376 if (conn->maxSerial < np->header.serial)
2377 conn->maxSerial = np->header.serial;
2378 MUTEX_EXIT(&conn->conn_data_lock);
2380 /* If the connection is in an error state, send an abort packet and ignore
2381 * the incoming packet */
2383 /* Don't respond to an abort packet--we don't want loops! */
2384 MUTEX_ENTER(&conn->conn_data_lock);
2385 if (np->header.type != RX_PACKET_TYPE_ABORT)
2386 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2388 MUTEX_EXIT(&conn->conn_data_lock);
2392 /* Check for connection-only requests (i.e. not call specific). */
2393 if (np->header.callNumber == 0) {
2394 switch (np->header.type) {
2395 case RX_PACKET_TYPE_ABORT:
2396 /* What if the supplied error is zero? */
2397 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2398 MUTEX_ENTER(&conn->conn_data_lock);
2400 MUTEX_EXIT(&conn->conn_data_lock);
2402 case RX_PACKET_TYPE_CHALLENGE:
2403 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2404 MUTEX_ENTER(&conn->conn_data_lock);
2406 MUTEX_EXIT(&conn->conn_data_lock);
2408 case RX_PACKET_TYPE_RESPONSE:
2409 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2410 MUTEX_ENTER(&conn->conn_data_lock);
2412 MUTEX_EXIT(&conn->conn_data_lock);
2414 case RX_PACKET_TYPE_PARAMS:
2415 case RX_PACKET_TYPE_PARAMS+1:
2416 case RX_PACKET_TYPE_PARAMS+2:
2417 /* ignore these packet types for now */
2418 MUTEX_ENTER(&conn->conn_data_lock);
2420 MUTEX_EXIT(&conn->conn_data_lock);
2425 /* Should not reach here, unless the peer is broken: send an
2427 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2428 MUTEX_ENTER(&conn->conn_data_lock);
2429 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2431 MUTEX_EXIT(&conn->conn_data_lock);
2436 channel = np->header.cid & RX_CHANNELMASK;
2437 call = conn->call[channel];
2438 #ifdef RX_ENABLE_LOCKS
2440 MUTEX_ENTER(&call->lock);
2441 /* Test to see if call struct is still attached to conn. */
2442 if (call != conn->call[channel]) {
2444 MUTEX_EXIT(&call->lock);
2445 if (type == RX_SERVER_CONNECTION) {
2446 call = conn->call[channel];
2447 /* If we started with no call attached and there is one now,
2448 * another thread is also running this routine and has gotten
2449 * the connection channel. We should drop this packet in the tests
2450 * below. If there was a call on this connection and it's now
2451 * gone, then we'll be making a new call below.
2452 * If there was previously a call and it's now different then
2453 * the old call was freed and another thread running this routine
2454 * has created a call on this channel. One of these two threads
2455 * has a packet for the old call and the code below handles those
2459 MUTEX_ENTER(&call->lock);
2462 /* This packet can't be for this call. If the new call address is
2463 * 0 then no call is running on this channel. If there is a call
2464 * then, since this is a client connection we're getting data for
2465 * it must be for the previous call.
2467 MUTEX_ENTER(&rx_stats_mutex);
2468 rx_stats.spuriousPacketsRead++;
2469 MUTEX_EXIT(&rx_stats_mutex);
2470 MUTEX_ENTER(&conn->conn_data_lock);
2472 MUTEX_EXIT(&conn->conn_data_lock);
2477 currentCallNumber = conn->callNumber[channel];
2479 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2480 if (np->header.callNumber < currentCallNumber) {
2481 MUTEX_ENTER(&rx_stats_mutex);
2482 rx_stats.spuriousPacketsRead++;
2483 MUTEX_EXIT(&rx_stats_mutex);
2484 #ifdef RX_ENABLE_LOCKS
2486 MUTEX_EXIT(&call->lock);
2488 MUTEX_ENTER(&conn->conn_data_lock);
2490 MUTEX_EXIT(&conn->conn_data_lock);
2494 call = rxi_NewCall(conn, channel);
2495 MUTEX_ENTER(&call->lock);
2496 *call->callNumber = np->header.callNumber;
2497 call->state = RX_STATE_PRECALL;
2498 clock_GetTime(&call->queueTime);
2499 hzero(call->bytesSent);
2500 hzero(call->bytesRcvd);
2501 rxi_KeepAliveOn(call);
2503 else if (np->header.callNumber != currentCallNumber) {
2504 /* Wait until the transmit queue is idle before deciding
2505 * whether to reset the current call. Chances are that the
2506 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2509 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2510 while ((call->state == RX_STATE_ACTIVE) &&
2511 (call->flags & RX_CALL_TQ_BUSY)) {
2512 call->flags |= RX_CALL_TQ_WAIT;
2513 #ifdef RX_ENABLE_LOCKS
2514 CV_WAIT(&call->cv_tq, &call->lock);
2515 #else /* RX_ENABLE_LOCKS */
2516 osi_rxSleep(&call->tq);
2517 #endif /* RX_ENABLE_LOCKS */
2519 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2520 /* If the new call cannot be taken right now send a busy and set
2521 * the error condition in this call, so that it terminates as
2522 * quickly as possible */
2523 if (call->state == RX_STATE_ACTIVE) {
2524 struct rx_packet *tp;
2526 rxi_CallError(call, RX_CALL_DEAD);
2527 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2528 MUTEX_EXIT(&call->lock);
2529 MUTEX_ENTER(&conn->conn_data_lock);
2531 MUTEX_EXIT(&conn->conn_data_lock);
2534 rxi_ResetCall(call, 0);
2535 *call->callNumber = np->header.callNumber;
2536 call->state = RX_STATE_PRECALL;
2537 clock_GetTime(&call->queueTime);
2538 hzero(call->bytesSent);
2539 hzero(call->bytesRcvd);
2541 * If the number of queued calls exceeds the overload
2542 * threshold then abort this call.
2544 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2545 struct rx_packet *tp;
2547 rxi_CallError(call, rx_BusyError);
2548 tp = rxi_SendCallAbort(call, np, 1, 0);
2549 MUTEX_EXIT(&call->lock);
2550 MUTEX_ENTER(&conn->conn_data_lock);
2552 MUTEX_EXIT(&conn->conn_data_lock);
2555 rxi_KeepAliveOn(call);
2558 /* Continuing call; do nothing here. */
2560 } else { /* we're the client */
2561 /* Ignore all incoming acknowledgements for calls in DALLY state */
2562 if ( call && (call->state == RX_STATE_DALLY)
2563 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2564 MUTEX_ENTER(&rx_stats_mutex);
2565 rx_stats.ignorePacketDally++;
2566 MUTEX_EXIT(&rx_stats_mutex);
2567 #ifdef RX_ENABLE_LOCKS
2569 MUTEX_EXIT(&call->lock);
2572 MUTEX_ENTER(&conn->conn_data_lock);
2574 MUTEX_EXIT(&conn->conn_data_lock);
2578 /* Ignore anything that's not relevant to the current call. If there
2579 * isn't a current call, then no packet is relevant. */
2580 if (!call || (np->header.callNumber != currentCallNumber)) {
2581 MUTEX_ENTER(&rx_stats_mutex);
2582 rx_stats.spuriousPacketsRead++;
2583 MUTEX_EXIT(&rx_stats_mutex);
2584 #ifdef RX_ENABLE_LOCKS
2586 MUTEX_EXIT(&call->lock);
2589 MUTEX_ENTER(&conn->conn_data_lock);
2591 MUTEX_EXIT(&conn->conn_data_lock);
2594 /* If the service security object index stamped in the packet does not
2595 * match the connection's security index, ignore the packet */
2596 if (np->header.securityIndex != conn->securityIndex) {
2597 #ifdef RX_ENABLE_LOCKS
2598 MUTEX_EXIT(&call->lock);
2600 MUTEX_ENTER(&conn->conn_data_lock);
2602 MUTEX_EXIT(&conn->conn_data_lock);
2606 /* If we're receiving the response, then all transmit packets are
2607 * implicitly acknowledged. Get rid of them. */
2608 if (np->header.type == RX_PACKET_TYPE_DATA) {
2609 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2610 /* XXX Hack. Because we must release the global rx lock when
2611 * sending packets (osi_NetSend) we drop all acks while we're
2612 * traversing the tq in rxi_Start sending packets out because
2613 * packets may move to the freePacketQueue as result of being here!
2614 * So we drop these packets until we're safely out of the
2615 * traversing. Really ugly!
2616 * For fine grain RX locking, we set the acked field in the
2617 * packets and let rxi_Start remove them from the transmit queue.
2619 if (call->flags & RX_CALL_TQ_BUSY) {
2620 #ifdef RX_ENABLE_LOCKS
2621 rxi_SetAcksInTransmitQueue(call);
2624 return np; /* xmitting; drop packet */
2628 rxi_ClearTransmitQueue(call, 0);
2630 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2631 rxi_ClearTransmitQueue(call, 0);
2632 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2634 if (np->header.type == RX_PACKET_TYPE_ACK) {
2635 /* now check to see if this is an ack packet acknowledging that the
2636 * server actually *lost* some hard-acked data. If this happens we
2637 * ignore this packet, as it may indicate that the server restarted in
2638 * the middle of a call. It is also possible that this is an old ack
2639 * packet. We don't abort the connection in this case, because this
2640 * *might* just be an old ack packet. The right way to detect a server
2641 * restart in the midst of a call is to notice that the server epoch
2643 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2644 * XXX unacknowledged. I think that this is off-by-one, but
2645 * XXX I don't dare change it just yet, since it will
2646 * XXX interact badly with the server-restart detection
2647 * XXX code in receiveackpacket. */
2648 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2649 MUTEX_ENTER(&rx_stats_mutex);
2650 rx_stats.spuriousPacketsRead++;
2651 MUTEX_EXIT(&rx_stats_mutex);
2652 MUTEX_EXIT(&call->lock);
2653 MUTEX_ENTER(&conn->conn_data_lock);
2655 MUTEX_EXIT(&conn->conn_data_lock);
2659 } /* else not a data packet */
2662 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2663 /* Set remote user defined status from packet */
2664 call->remoteStatus = np->header.userStatus;
2666 /* Note the gap between the expected next packet and the actual
2667 * packet that arrived, when the new packet has a smaller serial number
2668 * than expected. Rioses frequently reorder packets all by themselves,
2669 * so this will be quite important with very large window sizes.
2670 * Skew is checked against 0 here to avoid any dependence on the type of
2671 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2673 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2674 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2675 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2677 MUTEX_ENTER(&conn->conn_data_lock);
2678 skew = conn->lastSerial - np->header.serial;
2679 conn->lastSerial = np->header.serial;
2680 MUTEX_EXIT(&conn->conn_data_lock);
2682 register struct rx_peer *peer;
2684 if (skew > peer->inPacketSkew) {
2685 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2686 peer->inPacketSkew = skew;
2690 /* Now do packet type-specific processing */
2691 switch (np->header.type) {
2692 case RX_PACKET_TYPE_DATA:
2693 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2696 case RX_PACKET_TYPE_ACK:
2697 /* Respond immediately to ack packets requesting acknowledgement
2699 if (np->header.flags & RX_REQUEST_ACK) {
2700 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2701 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2703 np = rxi_ReceiveAckPacket(call, np, 1);
2705 case RX_PACKET_TYPE_ABORT:
2706 /* An abort packet: reset the connection, passing the error up to
2708 /* What if error is zero? */
2709 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2711 case RX_PACKET_TYPE_BUSY:
2714 case RX_PACKET_TYPE_ACKALL:
2715 /* All packets acknowledged, so we can drop all packets previously
2716 * readied for sending */
2717 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2718 /* XXX Hack. We because we can't release the global rx lock when
2719 * sending packets (osi_NetSend) we drop all ack pkts while we're
2720 * traversing the tq in rxi_Start sending packets out because
2721 * packets may move to the freePacketQueue as result of being
2722 * here! So we drop these packets until we're safely out of the
2723 * traversing. Really ugly!
2724 * For fine grain RX locking, we set the acked field in the packets
2725 * and let rxi_Start remove the packets from the transmit queue.
2727 if (call->flags & RX_CALL_TQ_BUSY) {
2728 #ifdef RX_ENABLE_LOCKS
2729 rxi_SetAcksInTransmitQueue(call);
2731 #else /* RX_ENABLE_LOCKS */
2733 return np; /* xmitting; drop packet */
2734 #endif /* RX_ENABLE_LOCKS */
2736 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2737 rxi_ClearTransmitQueue(call, 0);
2740 /* Should not reach here, unless the peer is broken: send an abort
2742 rxi_CallError(call, RX_PROTOCOL_ERROR);
2743 np = rxi_SendCallAbort(call, np, 1, 0);
2746 /* Note when this last legitimate packet was received, for keep-alive
2747 * processing. Note, we delay getting the time until now in the hope that
2748 * the packet will be delivered to the user before any get time is required
2749 * (if not, then the time won't actually be re-evaluated here). */
2750 call->lastReceiveTime = clock_Sec();
2751 MUTEX_EXIT(&call->lock);
2752 MUTEX_ENTER(&conn->conn_data_lock);
2754 MUTEX_EXIT(&conn->conn_data_lock);
2758 /* return true if this is an "interesting" connection from the point of view
2759 of someone trying to debug the system */
2760 int rxi_IsConnInteresting(struct rx_connection *aconn)
2763 register struct rx_call *tcall;
2765 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2767 for(i=0;i<RX_MAXCALLS;i++) {
2768 tcall = aconn->call[i];
2770 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2772 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2780 /* if this is one of the last few packets AND it wouldn't be used by the
2781 receiving call to immediately satisfy a read request, then drop it on
2782 the floor, since accepting it might prevent a lock-holding thread from
2783 making progress in its reading. If a call has been cleared while in
2784 the precall state then ignore all subsequent packets until the call
2785 is assigned to a thread. */
2787 static TooLow(ap, acall)
2788 struct rx_call *acall;
2789 struct rx_packet *ap; {
2791 MUTEX_ENTER(&rx_stats_mutex);
2792 if (((ap->header.seq != 1) &&
2793 (acall->flags & RX_CALL_CLEARED) &&
2794 (acall->state == RX_STATE_PRECALL)) ||
2795 ((rx_nFreePackets < rxi_dataQuota+2) &&
2796 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2797 && (acall->flags & RX_CALL_READER_WAIT)))) {
2800 MUTEX_EXIT(&rx_stats_mutex);
2805 /* try to attach call, if authentication is complete */
2806 static void TryAttach(acall, socket, tnop, newcallp)
2807 register struct rx_call *acall;
2808 register osi_socket socket;
2810 register struct rx_call **newcallp; {
2811 register struct rx_connection *conn;
2813 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2814 /* Don't attach until we have any req'd. authentication. */
2815 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2816 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2817 /* Note: this does not necessarily succeed; there
2818 may not any proc available */
2821 rxi_ChallengeOn(acall->conn);
2826 /* A data packet has been received off the interface. This packet is
2827 * appropriate to the call (the call is in the right state, etc.). This
2828 * routine can return a packet to the caller, for re-use */
2830 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2831 port, tnop, newcallp)
2832 register struct rx_call *call;
2833 register struct rx_packet *np;
2839 struct rx_call **newcallp;
2845 afs_uint32 seq, serial, flags;
2847 struct rx_packet *tnp;
2849 MUTEX_ENTER(&rx_stats_mutex);
2850 rx_stats.dataPacketsRead++;
2851 MUTEX_EXIT(&rx_stats_mutex);
2854 /* If there are no packet buffers, drop this new packet, unless we can find
2855 * packet buffers from inactive calls */
2857 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2858 MUTEX_ENTER(&rx_freePktQ_lock);
2859 rxi_NeedMorePackets = TRUE;
2860 MUTEX_EXIT(&rx_freePktQ_lock);
2861 MUTEX_ENTER(&rx_stats_mutex);
2862 rx_stats.noPacketBuffersOnRead++;
2863 MUTEX_EXIT(&rx_stats_mutex);
2864 call->rprev = np->header.serial;
2865 rxi_calltrace(RX_TRACE_DROP, call);
2866 dpf (("packet %x dropped on receipt - quota problems", np));
2868 rxi_ClearReceiveQueue(call);
2869 clock_GetTime(&when);
2870 clock_Add(&when, &rx_softAckDelay);
2871 if (!call->delayedAckEvent ||
2872 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2873 rxevent_Cancel(call->delayedAckEvent, call,
2874 RX_CALL_REFCOUNT_DELAY);
2875 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2876 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2879 /* we've damaged this call already, might as well do it in. */
2885 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2886 * packet is one of several packets transmitted as a single
2887 * datagram. Do not send any soft or hard acks until all packets
2888 * in a jumbogram have been processed. Send negative acks right away.
2890 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2891 /* tnp is non-null when there are more packets in the
2892 * current jumbo gram */
2899 seq = np->header.seq;
2900 serial = np->header.serial;
2901 flags = np->header.flags;
2903 /* If the call is in an error state, send an abort message */
2905 return rxi_SendCallAbort(call, np, istack, 0);
2907 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2908 * AFS 3.5 jumbogram. */
2909 if (flags & RX_JUMBO_PACKET) {
2910 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2915 if (np->header.spare != 0) {
2916 MUTEX_ENTER(&call->conn->conn_data_lock);
2917 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2918 MUTEX_EXIT(&call->conn->conn_data_lock);
2921 /* The usual case is that this is the expected next packet */
2922 if (seq == call->rnext) {
2924 /* Check to make sure it is not a duplicate of one already queued */
2925 if (queue_IsNotEmpty(&call->rq)
2926 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2927 MUTEX_ENTER(&rx_stats_mutex);
2928 rx_stats.dupPacketsRead++;
2929 MUTEX_EXIT(&rx_stats_mutex);
2930 dpf (("packet %x dropped on receipt - duplicate", np));
2931 rxevent_Cancel(call->delayedAckEvent, call,
2932 RX_CALL_REFCOUNT_DELAY);
2933 np = rxi_SendAck(call, np, seq, serial,
2934 flags, RX_ACK_DUPLICATE, istack);
2940 /* It's the next packet. Stick it on the receive queue
2941 * for this call. Set newPackets to make sure we wake
2942 * the reader once all packets have been processed */
2943 queue_Prepend(&call->rq, np);
2945 np = NULL; /* We can't use this anymore */
2948 /* If an ack is requested then set a flag to make sure we
2949 * send an acknowledgement for this packet */
2950 if (flags & RX_REQUEST_ACK) {
2954 /* Keep track of whether we have received the last packet */
2955 if (flags & RX_LAST_PACKET) {
2956 call->flags |= RX_CALL_HAVE_LAST;
2960 /* Check whether we have all of the packets for this call */
2961 if (call->flags & RX_CALL_HAVE_LAST) {
2962 afs_uint32 tseq; /* temporary sequence number */
2963 struct rx_packet *tp; /* Temporary packet pointer */
2964 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2966 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2967 if (tseq != tp->header.seq)
2969 if (tp->header.flags & RX_LAST_PACKET) {
2970 call->flags |= RX_CALL_RECEIVE_DONE;
2977 /* Provide asynchronous notification for those who want it
2978 * (e.g. multi rx) */
2979 if (call->arrivalProc) {
2980 (*call->arrivalProc)(call, call->arrivalProcHandle,
2981 call->arrivalProcArg);
2982 call->arrivalProc = (VOID (*)()) 0;
2985 /* Update last packet received */
2988 /* If there is no server process serving this call, grab
2989 * one, if available. We only need to do this once. If a
2990 * server thread is available, this thread becomes a server
2991 * thread and the server thread becomes a listener thread. */
2993 TryAttach(call, socket, tnop, newcallp);
2996 /* This is not the expected next packet. */
2998 /* Determine whether this is a new or old packet, and if it's
2999 * a new one, whether it fits into the current receive window.
3000 * Also figure out whether the packet was delivered in sequence.
3001 * We use the prev variable to determine whether the new packet
3002 * is the successor of its immediate predecessor in the
3003 * receive queue, and the missing flag to determine whether
3004 * any of this packets predecessors are missing. */
3006 afs_uint32 prev; /* "Previous packet" sequence number */
3007 struct rx_packet *tp; /* Temporary packet pointer */
3008 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3009 int missing; /* Are any predecessors missing? */
3011 /* If the new packet's sequence number has been sent to the
3012 * application already, then this is a duplicate */
3013 if (seq < call->rnext) {
3014 MUTEX_ENTER(&rx_stats_mutex);
3015 rx_stats.dupPacketsRead++;
3016 MUTEX_EXIT(&rx_stats_mutex);
3017 rxevent_Cancel(call->delayedAckEvent, call,
3018 RX_CALL_REFCOUNT_DELAY);
3019 np = rxi_SendAck(call, np, seq, serial,
3020 flags, RX_ACK_DUPLICATE, istack);
3026 /* If the sequence number is greater than what can be
3027 * accomodated by the current window, then send a negative
3028 * acknowledge and drop the packet */
3029 if ((call->rnext + call->rwind) <= seq) {
3030 rxevent_Cancel(call->delayedAckEvent, call,
3031 RX_CALL_REFCOUNT_DELAY);
3032 np = rxi_SendAck(call, np, seq, serial,
3033 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3039 /* Look for the packet in the queue of old received packets */
3040 for (prev = call->rnext - 1, missing = 0,
3041 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3042 /*Check for duplicate packet */
3043 if (seq == tp->header.seq) {
3044 MUTEX_ENTER(&rx_stats_mutex);
3045 rx_stats.dupPacketsRead++;
3046 MUTEX_EXIT(&rx_stats_mutex);
3047 rxevent_Cancel(call->delayedAckEvent, call,
3048 RX_CALL_REFCOUNT_DELAY);
3049 np = rxi_SendAck(call, np, seq, serial,
3050 flags, RX_ACK_DUPLICATE, istack);
3055 /* If we find a higher sequence packet, break out and
3056 * insert the new packet here. */
3057 if (seq < tp->header.seq) break;
3058 /* Check for missing packet */
3059 if (tp->header.seq != prev+1) {
3063 prev = tp->header.seq;
3066 /* Keep track of whether we have received the last packet. */
3067 if (flags & RX_LAST_PACKET) {
3068 call->flags |= RX_CALL_HAVE_LAST;
3071 /* It's within the window: add it to the the receive queue.
3072 * tp is left by the previous loop either pointing at the
3073 * packet before which to insert the new packet, or at the
3074 * queue head if the queue is empty or the packet should be
3076 queue_InsertBefore(tp, np);
3080 /* Check whether we have all of the packets for this call */
3081 if ((call->flags & RX_CALL_HAVE_LAST)
3082 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3083 afs_uint32 tseq; /* temporary sequence number */
3085 for (tseq = call->rnext,
3086 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3087 if (tseq != tp->header.seq)
3089 if (tp->header.flags & RX_LAST_PACKET) {
3090 call->flags |= RX_CALL_RECEIVE_DONE;
3097 /* We need to send an ack of the packet is out of sequence,
3098 * or if an ack was requested by the peer. */
3099 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3103 /* Acknowledge the last packet for each call */
3104 if (flags & RX_LAST_PACKET) {
3115 * If the receiver is waiting for an iovec, fill the iovec
3116 * using the data from the receive queue */
3117 if (call->flags & RX_CALL_IOVEC_WAIT) {
3118 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3119 /* the call may have been aborted */
3128 /* Wakeup the reader if any */
3129 if ((call->flags & RX_CALL_READER_WAIT) &&
3130 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3131 (call->iovNext >= call->iovMax) ||
3132 (call->flags & RX_CALL_RECEIVE_DONE))) {
3133 call->flags &= ~RX_CALL_READER_WAIT;
3134 #ifdef RX_ENABLE_LOCKS
3135 CV_BROADCAST(&call->cv_rq);
3137 osi_rxWakeup(&call->rq);
3143 * Send an ack when requested by the peer, or once every
3144 * rxi_SoftAckRate packets until the last packet has been
3145 * received. Always send a soft ack for the last packet in
3146 * the server's reply. */
3148 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3149 np = rxi_SendAck(call, np, seq, serial, flags,
3150 RX_ACK_REQUESTED, istack);
3151 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3152 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3153 np = rxi_SendAck(call, np, seq, serial, flags,
3154 RX_ACK_DELAY, istack);
3155 } else if (call->nSoftAcks) {
3156 clock_GetTime(&when);
3157 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3158 clock_Add(&when, &rx_lastAckDelay);
3160 clock_Add(&when, &rx_softAckDelay);
3162 if (!call->delayedAckEvent ||
3163 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3164 rxevent_Cancel(call->delayedAckEvent, call,
3165 RX_CALL_REFCOUNT_DELAY);
3166 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3167 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3170 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3171 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3178 static void rxi_ComputeRate();
3181 /* The real smarts of the whole thing. */
3182 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3183 register struct rx_call *call;
3184 struct rx_packet *np;
3187 struct rx_ackPacket *ap;
3189 register struct rx_packet *tp;
3190 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3191 register struct rx_connection *conn = call->conn;
3192 struct rx_peer *peer = conn->peer;
3195 /* because there are CM's that are bogus, sending weird values for this. */
3196 afs_uint32 skew = 0;
3197 int needRxStart = 0;
3202 int newAckCount = 0;
3203 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3204 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3206 MUTEX_ENTER(&rx_stats_mutex);
3207 rx_stats.ackPacketsRead++;
3208 MUTEX_EXIT(&rx_stats_mutex);
3209 ap = (struct rx_ackPacket *) rx_DataOf(np);
3210 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3212 return np; /* truncated ack packet */
3214 /* depends on ack packet struct */
3215 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3216 first = ntohl(ap->firstPacket);
3217 serial = ntohl(ap->serial);
3218 /* temporarily disabled -- needs to degrade over time
3219 skew = ntohs(ap->maxSkew); */
3221 /* Ignore ack packets received out of order */
3222 if (first < call->tfirst) {
3226 if (np->header.flags & RX_SLOW_START_OK) {
3227 call->flags |= RX_CALL_SLOW_START_OK;
3233 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3234 ap->reason, ntohl(ap->previousPacket), np->header.seq, serial,
3235 skew, ntohl(ap->firstPacket));
3238 for (offset = 0; offset < nAcks; offset++)
3239 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3245 /* if a server connection has been re-created, it doesn't remember what
3246 serial # it was up to. An ack will tell us, since the serial field
3247 contains the largest serial received by the other side */
3248 MUTEX_ENTER(&conn->conn_data_lock);
3249 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3250 conn->serial = serial+1;
3252 MUTEX_EXIT(&conn->conn_data_lock);
3254 /* Update the outgoing packet skew value to the latest value of
3255 * the peer's incoming packet skew value. The ack packet, of
3256 * course, could arrive out of order, but that won't affect things
3258 MUTEX_ENTER(&peer->peer_lock);
3259 peer->outPacketSkew = skew;
3261 /* Check for packets that no longer need to be transmitted, and
3262 * discard them. This only applies to packets positively
3263 * acknowledged as having been sent to the peer's upper level.
3264 * All other packets must be retained. So only packets with
3265 * sequence numbers < ap->firstPacket are candidates. */
3266 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3267 if (tp->header.seq >= first) break;
3268 call->tfirst = tp->header.seq + 1;
3269 if (tp->header.serial == serial) {
3270 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3272 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3275 else if ((tp->firstSerial == serial)) {
3276 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3278 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3281 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3282 /* XXX Hack. Because we have to release the global rx lock when sending
3283 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3284 * in rxi_Start sending packets out because packets may move to the
3285 * freePacketQueue as result of being here! So we drop these packets until
3286 * we're safely out of the traversing. Really ugly!
3287 * To make it even uglier, if we're using fine grain locking, we can
3288 * set the ack bits in the packets and have rxi_Start remove the packets
3289 * when it's done transmitting.
3294 if (call->flags & RX_CALL_TQ_BUSY) {
3295 #ifdef RX_ENABLE_LOCKS
3297 call->flags |= RX_CALL_TQ_SOME_ACKED;
3298 #else /* RX_ENABLE_LOCKS */
3300 #endif /* RX_ENABLE_LOCKS */
3302 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3305 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3310 /* Give rate detector a chance to respond to ping requests */
3311 if (ap->reason == RX_ACK_PING_RESPONSE) {
3312 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3316 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3318 /* Now go through explicit acks/nacks and record the results in
3319 * the waiting packets. These are packets that can't be released
3320 * yet, even with a positive acknowledge. This positive
3321 * acknowledge only means the packet has been received by the
3322 * peer, not that it will be retained long enough to be sent to
3323 * the peer's upper level. In addition, reset the transmit timers
3324 * of any missing packets (those packets that must be missing
3325 * because this packet was out of sequence) */
3327 call->nSoftAcked = 0;
3328 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3329 /* Update round trip time if the ack was stimulated on receipt
3331 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3332 #ifdef RX_ENABLE_LOCKS
3333 if (tp->header.seq >= first) {
3334 #endif /* RX_ENABLE_LOCKS */
3335 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3336 if (tp->header.serial == serial) {
3337 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3339 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3342 else if ((tp->firstSerial == serial)) {
3343 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3345 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3348 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3349 #ifdef RX_ENABLE_LOCKS
3351 #endif /* RX_ENABLE_LOCKS */
3352 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3354 /* Set the acknowledge flag per packet based on the
3355 * information in the ack packet. An acknowlegded packet can
3356 * be downgraded when the server has discarded a packet it
3357 * soacked previously, or when an ack packet is received
3358 * out of sequence. */
3359 if (tp->header.seq < first) {
3360 /* Implicit ack information */
3366 else if (tp->header.seq < first + nAcks) {
3367 /* Explicit ack information: set it in the packet appropriately */
3368 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3388 /* If packet isn't yet acked, and it has been transmitted at least
3389 * once, reset retransmit time using latest timeout
3390 * ie, this should readjust the retransmit timer for all outstanding
3391 * packets... So we don't just retransmit when we should know better*/
3393 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3394 tp->retryTime = tp->timeSent;
3395 clock_Add(&tp->retryTime, &peer->timeout);
3396 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3397 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3401 /* If the window has been extended by this acknowledge packet,
3402 * then wakeup a sender waiting in alloc for window space, or try
3403 * sending packets now, if he's been sitting on packets due to
3404 * lack of window space */
3405 if (call->tnext < (call->tfirst + call->twind)) {
3406 #ifdef RX_ENABLE_LOCKS
3407 CV_SIGNAL(&call->cv_twind);
3409 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3410 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3411 osi_rxWakeup(&call->twind);
3414 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3415 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3419 /* if the ack packet has a receivelen field hanging off it,
3420 * update our state */
3421 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3424 /* If the ack packet has a "recommended" size that is less than
3425 * what I am using now, reduce my size to match */
3426 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3427 sizeof(afs_int32), &tSize);
3428 tSize = (afs_uint32) ntohl(tSize);
3429 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3431 /* Get the maximum packet size to send to this peer */
3432 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3434 tSize = (afs_uint32)ntohl(tSize);
3435 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3436 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3438 /* sanity check - peer might have restarted with different params.
3439 * If peer says "send less", dammit, send less... Peer should never
3440 * be unable to accept packets of the size that prior AFS versions would
3441 * send without asking. */
3442 if (peer->maxMTU != tSize) {
3443 peer->maxMTU = tSize;
3444 peer->MTU = MIN(tSize, peer->MTU);
3445 call->MTU = MIN(call->MTU, tSize);
3449 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3451 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3452 sizeof(afs_int32), &tSize);
3453 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3454 if (tSize < call->twind) { /* smaller than our send */
3455 call->twind = tSize; /* window, we must send less... */
3456 call->ssthresh = MIN(call->twind, call->ssthresh);
3459 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3460 * network MTU confused with the loopback MTU. Calculate the
3461 * maximum MTU here for use in the slow start code below.
3463 maxMTU = peer->maxMTU;
3464 /* Did peer restart with older RX version? */
3465 if (peer->maxDgramPackets > 1) {
3466 peer->maxDgramPackets = 1;
3468 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3470 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3471 sizeof(afs_int32), &tSize);
3472 tSize = (afs_uint32) ntohl(tSize);
3474 * As of AFS 3.5 we set the send window to match the receive window.
3476 if (tSize < call->twind) {
3477 call->twind = tSize;
3478 call->ssthresh = MIN(call->twind, call->ssthresh);
3479 } else if (tSize > call->twind) {
3480 call->twind = tSize;
3484 * As of AFS 3.5, a jumbogram is more than one fixed size
3485 * packet transmitted in a single UDP datagram. If the remote
3486 * MTU is smaller than our local MTU then never send a datagram
3487 * larger than the natural MTU.
3489 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3490 sizeof(afs_int32), &tSize);
3491 maxDgramPackets = (afs_uint32) ntohl(tSize);
3492 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3493 maxDgramPackets = MIN(maxDgramPackets,
3494 (int)(peer->ifDgramPackets));
3495 maxDgramPackets = MIN(maxDgramPackets, tSize);
3496 if (maxDgramPackets > 1) {
3497 peer->maxDgramPackets = maxDgramPackets;
3498 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3500 peer->maxDgramPackets = 1;
3501 call->MTU = peer->natMTU;
3503 } else if (peer->maxDgramPackets > 1) {
3504 /* Restarted with lower version of RX */
3505 peer->maxDgramPackets = 1;
3507 } else if (peer->maxDgramPackets > 1 ||
3508 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3509 /* Restarted with lower version of RX */
3510 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3511 peer->natMTU = OLD_MAX_PACKET_SIZE;
3512 peer->MTU = OLD_MAX_PACKET_SIZE;
3513 peer->maxDgramPackets = 1;
3514 peer->nDgramPackets = 1;
3516 call->MTU = OLD_MAX_PACKET_SIZE;
3521 * Calculate how many datagrams were successfully received after
3522 * the first missing packet and adjust the negative ack counter
3527 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3528 if (call->nNacks < nNacked) {
3529 call->nNacks = nNacked;
3538 if (call->flags & RX_CALL_FAST_RECOVER) {
3540 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3542 call->flags &= ~RX_CALL_FAST_RECOVER;
3543 call->cwind = call->nextCwind;
3544 call->nextCwind = 0;
3547 call->nCwindAcks = 0;
3549 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3550 /* Three negative acks in a row trigger congestion recovery */
3551 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3552 MUTEX_EXIT(&peer->peer_lock);
3553 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3554 /* someone else is waiting to start recovery */
3557 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3558 while (call->flags & RX_CALL_TQ_BUSY) {
3559 call->flags |= RX_CALL_TQ_WAIT;
3560 #ifdef RX_ENABLE_LOCKS
3561 CV_WAIT(&call->cv_tq, &call->lock);
3562 #else /* RX_ENABLE_LOCKS */
3563 osi_rxSleep(&call->tq);
3564 #endif /* RX_ENABLE_LOCKS */
3566 MUTEX_ENTER(&peer->peer_lock);
3567 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3568 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3569 call->flags |= RX_CALL_FAST_RECOVER;
3570 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3571 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3573 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3574 call->nextCwind = call->ssthresh;
3577 peer->MTU = call->MTU;
3578 peer->cwind = call->nextCwind;
3579 peer->nDgramPackets = call->nDgramPackets;
3581 call->congestSeq = peer->congestSeq;
3582 /* Reset the resend times on the packets that were nacked
3583 * so we will retransmit as soon as the window permits*/
3584 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3587 clock_Zero(&tp->retryTime);
3589 } else if (tp->acked) {
3594 /* If cwind is smaller than ssthresh, then increase
3595 * the window one packet for each ack we receive (exponential
3597 * If cwind is greater than or equal to ssthresh then increase
3598 * the congestion window by one packet for each cwind acks we
3599 * receive (linear growth). */
3600 if (call->cwind < call->ssthresh) {
3601 call->cwind = MIN((int)call->ssthresh,
3602 (int)(call->cwind + newAckCount));
3603 call->nCwindAcks = 0;
3605 call->nCwindAcks += newAckCount;
3606 if (call->nCwindAcks >= call->cwind) {
3607 call->nCwindAcks = 0;
3608 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3612 * If we have received several acknowledgements in a row then
3613 * it is time to increase the size of our datagrams
3615 if ((int)call->nAcks > rx_nDgramThreshold) {
3616 if (peer->maxDgramPackets > 1) {
3617 if (call->nDgramPackets < peer->maxDgramPackets) {
3618 call->nDgramPackets++;
3620 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3621 } else if (call->MTU < peer->maxMTU) {
3622 call->MTU += peer->natMTU;
3623 call->MTU = MIN(call->MTU, peer->maxMTU);
3629 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3631 /* Servers need to hold the call until all response packets have
3632 * been acknowledged. Soft acks are good enough since clients
3633 * are not allowed to clear their receive queues. */
3634 if (call->state == RX_STATE_HOLD &&
3635 call->tfirst + call->nSoftAcked >= call->tnext) {
3636 call->state = RX_STATE_DALLY;
3637 rxi_ClearTransmitQueue(call, 0);
3638 } else if (!queue_IsEmpty(&call->tq)) {
3639 rxi_Start(0, call, istack);
3644 /* Received a response to a challenge packet */
3645 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3646 register struct rx_connection *conn;
3647 register struct rx_packet *np;
3652 /* Ignore the packet if we're the client */
3653 if (conn->type == RX_CLIENT_CONNECTION) return np;
3655 /* If already authenticated, ignore the packet (it's probably a retry) */
3656 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3659 /* Otherwise, have the security object evaluate the response packet */
3660 error = RXS_CheckResponse(conn->securityObject, conn, np);
3662 /* If the response is invalid, reset the connection, sending
3663 * an abort to the peer */
3667 rxi_ConnectionError(conn, error);
3668 MUTEX_ENTER(&conn->conn_data_lock);
3669 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3670 MUTEX_EXIT(&conn->conn_data_lock);
3674 /* If the response is valid, any calls waiting to attach
3675 * servers can now do so */
3677 for (i=0; i<RX_MAXCALLS; i++) {
3678 struct rx_call *call = conn->call[i];
3680 MUTEX_ENTER(&call->lock);
3681 if (call->state == RX_STATE_PRECALL)
3682 rxi_AttachServerProc(call, -1, NULL, NULL);
3683 MUTEX_EXIT(&call->lock);
3690 /* A client has received an authentication challenge: the security
3691 * object is asked to cough up a respectable response packet to send
3692 * back to the server. The server is responsible for retrying the
3693 * challenge if it fails to get a response. */
3696 rxi_ReceiveChallengePacket(conn, np, istack)
3697 register struct rx_connection *conn;
3698 register struct rx_packet *np;
3703 /* Ignore the challenge if we're the server */
3704 if (conn->type == RX_SERVER_CONNECTION) return np;
3706 /* Ignore the challenge if the connection is otherwise idle; someone's
3707 * trying to use us as an oracle. */
3708 if (!rxi_HasActiveCalls(conn)) return np;
3710 /* Send the security object the challenge packet. It is expected to fill
3711 * in the response. */
3712 error = RXS_GetResponse(conn->securityObject, conn, np);
3714 /* If the security object is unable to return a valid response, reset the
3715 * connection and send an abort to the peer. Otherwise send the response
3716 * packet to the peer connection. */
3718 rxi_ConnectionError(conn, error);
3719 MUTEX_ENTER(&conn->conn_data_lock);
3720 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3721 MUTEX_EXIT(&conn->conn_data_lock);
3724 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3725 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3731 /* Find an available server process to service the current request in
3732 * the given call structure. If one isn't available, queue up this
3733 * call so it eventually gets one */
3735 rxi_AttachServerProc(call, socket, tnop, newcallp)
3736 register struct rx_call *call;
3737 register osi_socket socket;
3739 register struct rx_call **newcallp;
3741 register struct rx_serverQueueEntry *sq;
3742 register struct rx_service *service = call->conn->service;
3743 #ifdef RX_ENABLE_LOCKS
3744 register int haveQuota = 0;
3745 #endif /* RX_ENABLE_LOCKS */
3746 /* May already be attached */
3747 if (call->state == RX_STATE_ACTIVE) return;
3749 MUTEX_ENTER(&rx_serverPool_lock);
3750 #ifdef RX_ENABLE_LOCKS
3751 while(rxi_ServerThreadSelectingCall) {
3752 MUTEX_EXIT(&call->lock);
3753 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3754 MUTEX_EXIT(&rx_serverPool_lock);
3755 MUTEX_ENTER(&call->lock);
3756 MUTEX_ENTER(&rx_serverPool_lock);
3757 /* Call may have been attached */
3758 if (call->state == RX_STATE_ACTIVE) return;
3761 haveQuota = QuotaOK(service);
3762 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3763 /* If there are no processes available to service this call,
3764 * put the call on the incoming call queue (unless it's
3765 * already on the queue).
3768 ReturnToServerPool(service);
3769 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3770 call->flags |= RX_CALL_WAIT_PROC;
3771 MUTEX_ENTER(&rx_stats_mutex);
3773 MUTEX_EXIT(&rx_stats_mutex);
3774 rxi_calltrace(RX_CALL_ARRIVAL, call);
3775 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3776 queue_Append(&rx_incomingCallQueue, call);
3779 #else /* RX_ENABLE_LOCKS */
3780 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3781 /* If there are no processes available to service this call,
3782 * put the call on the incoming call queue (unless it's
3783 * already on the queue).
3785 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3786 call->flags |= RX_CALL_WAIT_PROC;
3788 rxi_calltrace(RX_CALL_ARRIVAL, call);
3789 queue_Append(&rx_incomingCallQueue, call);
3792 #endif /* RX_ENABLE_LOCKS */
3794 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3796 /* If hot threads are enabled, and both newcallp and sq->socketp
3797 * are non-null, then this thread will process the call, and the
3798 * idle server thread will start listening on this threads socket.
3801 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3804 *sq->socketp = socket;
3805 clock_GetTime(&call->startTime);
3806 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3810 if (call->flags & RX_CALL_WAIT_PROC) {
3811 /* Conservative: I don't think this should happen */
3812 call->flags &= ~RX_CALL_WAIT_PROC;
3813 MUTEX_ENTER(&rx_stats_mutex);
3815 MUTEX_EXIT(&rx_stats_mutex);
3818 call->state = RX_STATE_ACTIVE;
3819 call->mode = RX_MODE_RECEIVING;
3820 if (call->flags & RX_CALL_CLEARED) {
3821 /* send an ack now to start the packet flow up again */
3822 call->flags &= ~RX_CALL_CLEARED;
3823 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3825 #ifdef RX_ENABLE_LOCKS
3828 service->nRequestsRunning++;
3829 if (service->nRequestsRunning <= service->minProcs)
3835 MUTEX_EXIT(&rx_serverPool_lock);
3838 /* Delay the sending of an acknowledge event for a short while, while
3839 * a new call is being prepared (in the case of a client) or a reply
3840 * is being prepared (in the case of a server). Rather than sending
3841 * an ack packet, an ACKALL packet is sent. */
3842 void rxi_AckAll(event, call, dummy)
3843 struct rxevent *event;
3844 register struct rx_call *call;
3847 #ifdef RX_ENABLE_LOCKS
3849 MUTEX_ENTER(&call->lock);
3850 call->delayedAckEvent = (struct rxevent *) 0;
3851 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3853 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3854 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3856 MUTEX_EXIT(&call->lock);
3857 #else /* RX_ENABLE_LOCKS */
3858 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3859 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3860 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3861 #endif /* RX_ENABLE_LOCKS */
3864 void rxi_SendDelayedAck(event, call, dummy)
3865 struct rxevent *event;
3866 register struct rx_call *call;
3869 #ifdef RX_ENABLE_LOCKS
3871 MUTEX_ENTER(&call->lock);
3872 if (event == call->delayedAckEvent)
3873 call->delayedAckEvent = (struct rxevent *) 0;
3874 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3876 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3878 MUTEX_EXIT(&call->lock);
3879 #else /* RX_ENABLE_LOCKS */
3880 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3881 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3882 #endif /* RX_ENABLE_LOCKS */
3886 #ifdef RX_ENABLE_LOCKS
3887 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3888 * clearing them out.
3890 static void rxi_SetAcksInTransmitQueue(call)
3891 register struct rx_call *call;
3893 register struct rx_packet *p, *tp;
3896 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3903 call->flags |= RX_CALL_TQ_CLEARME;
3904 call->flags |= RX_CALL_TQ_SOME_ACKED;
3907 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3908 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3909 call->tfirst = call->tnext;
3910 call->nSoftAcked = 0;
3912 if (call->flags & RX_CALL_FAST_RECOVER) {
3913 call->flags &= ~RX_CALL_FAST_RECOVER;
3914 call->cwind = call->nextCwind;
3915 call->nextCwind = 0;
3918 CV_SIGNAL(&call->cv_twind);
3920 #endif /* RX_ENABLE_LOCKS */
3922 /* Clear out the transmit queue for the current call (all packets have
3923 * been received by peer) */
3924 void rxi_ClearTransmitQueue(call, force)
3925 register struct rx_call *call;
3928 register struct rx_packet *p, *tp;
3930 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3931 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3933 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3940 call->flags |= RX_CALL_TQ_CLEARME;
3941 call->flags |= RX_CALL_TQ_SOME_ACKED;
3944 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3945 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3951 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3952 call->flags &= ~RX_CALL_TQ_CLEARME;
3954 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3956 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3957 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3958 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3959 call->nSoftAcked = 0;
3961 if (call->flags & RX_CALL_FAST_RECOVER) {
3962 call->flags &= ~RX_CALL_FAST_RECOVER;
3963 call->cwind = call->nextCwind;
3966 #ifdef RX_ENABLE_LOCKS
3967 CV_SIGNAL(&call->cv_twind);
3969 osi_rxWakeup(&call->twind);
3973 void rxi_ClearReceiveQueue(call)
3974 register struct rx_call *call;
3976 register struct rx_packet *p, *tp;
3977 if (queue_IsNotEmpty(&call->rq)) {
3978 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
3983 rx_packetReclaims++;
3985 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
3987 if (call->state == RX_STATE_PRECALL) {
3988 call->flags |= RX_CALL_CLEARED;
3992 /* Send an abort packet for the specified call */
3993 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
3994 register struct rx_call *call;
3995 struct rx_packet *packet;
4005 /* Clients should never delay abort messages */
4006 if (rx_IsClientConn(call->conn))
4009 if (call->abortCode != call->error) {
4010 call->abortCode = call->error;
4011 call->abortCount = 0;
4014 if (force || rxi_callAbortThreshhold == 0 ||
4015 call->abortCount < rxi_callAbortThreshhold) {
4016 if (call->delayedAbortEvent) {
4017 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4019 error = htonl(call->error);
4021 packet = rxi_SendSpecial(call, call->conn, packet,
4022 RX_PACKET_TYPE_ABORT, (char *)&error,
4023 sizeof(error), istack);
4024 } else if (!call->delayedAbortEvent) {
4025 clock_GetTime(&when);
4026 clock_Addmsec(&when, rxi_callAbortDelay);
4027 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4028 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4034 /* Send an abort packet for the specified connection. Packet is an
4035 * optional pointer to a packet that can be used to send the abort.
4036 * Once the number of abort messages reaches the threshhold, an
4037 * event is scheduled to send the abort. Setting the force flag
4038 * overrides sending delayed abort messages.
4040 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4041 * to send the abort packet.
4043 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4044 register struct rx_connection *conn;
4045 struct rx_packet *packet;
4055 /* Clients should never delay abort messages */
4056 if (rx_IsClientConn(conn))
4059 if (force || rxi_connAbortThreshhold == 0 ||
4060 conn->abortCount < rxi_connAbortThreshhold) {
4061 if (conn->delayedAbortEvent) {
4062 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4064 error = htonl(conn->error);
4066 MUTEX_EXIT(&conn->conn_data_lock);
4067 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4068 RX_PACKET_TYPE_ABORT, (char *)&error,
4069 sizeof(error), istack);
4070 MUTEX_ENTER(&conn->conn_data_lock);
4071 } else if (!conn->delayedAbortEvent) {
4072 clock_GetTime(&when);
4073 clock_Addmsec(&when, rxi_connAbortDelay);
4074 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4080 /* Associate an error all of the calls owned by a connection. Called
4081 * with error non-zero. This is only for really fatal things, like
4082 * bad authentication responses. The connection itself is set in
4083 * error at this point, so that future packets received will be
4085 void rxi_ConnectionError(conn, error)
4086 register struct rx_connection *conn;
4087 register afs_int32 error;
4091 if (conn->challengeEvent)
4092 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4093 for (i=0; i<RX_MAXCALLS; i++) {
4094 struct rx_call *call = conn->call[i];
4096 MUTEX_ENTER(&call->lock);
4097 rxi_CallError(call, error);
4098 MUTEX_EXIT(&call->lock);
4101 conn->error = error;
4102 MUTEX_ENTER(&rx_stats_mutex);
4103 rx_stats.fatalErrors++;
4104 MUTEX_EXIT(&rx_stats_mutex);
4108 void rxi_CallError(call, error)
4109 register struct rx_call *call;
4112 if (call->error) error = call->error;
4113 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4114 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4115 rxi_ResetCall(call, 0);
4118 rxi_ResetCall(call, 0);
4120 call->error = error;
4121 call->mode = RX_MODE_ERROR;
4124 /* Reset various fields in a call structure, and wakeup waiting
4125 * processes. Some fields aren't changed: state & mode are not
4126 * touched (these must be set by the caller), and bufptr, nLeft, and
4127 * nFree are not reset, since these fields are manipulated by
4128 * unprotected macros, and may only be reset by non-interrupting code.
4131 /* this code requires that call->conn be set properly as a pre-condition. */
4132 #endif /* ADAPT_WINDOW */
4134 void rxi_ResetCall(call, newcall)
4135 register struct rx_call *call;
4136 register int newcall;
4139 register struct rx_peer *peer;
4140 struct rx_packet *packet;
4142 /* Notify anyone who is waiting for asynchronous packet arrival */
4143 if (call->arrivalProc) {
4144 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4145 call->arrivalProc = (VOID (*)()) 0;
4148 if (call->delayedAbortEvent) {
4149 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4150 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4152 rxi_SendCallAbort(call, packet, 0, 1);
4153 rxi_FreePacket(packet);
4158 * Update the peer with the congestion information in this call
4159 * so other calls on this connection can pick up where this call
4160 * left off. If the congestion sequence numbers don't match then
4161 * another call experienced a retransmission.
4163 peer = call->conn->peer;
4164 MUTEX_ENTER(&peer->peer_lock);
4166 if (call->congestSeq == peer->congestSeq) {
4167 peer->cwind = MAX(peer->cwind, call->cwind);
4168 peer->MTU = MAX(peer->MTU, call->MTU);
4169 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4172 call->abortCode = 0;
4173 call->abortCount = 0;
4175 if (peer->maxDgramPackets > 1) {
4176 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4178 call->MTU = peer->MTU;
4180 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4181 call->ssthresh = rx_maxSendWindow;
4182 call->nDgramPackets = peer->nDgramPackets;
4183 call->congestSeq = peer->congestSeq;
4184 MUTEX_EXIT(&peer->peer_lock);
4186 flags = call->flags;
4187 rxi_ClearReceiveQueue(call);
4188 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4189 if (call->flags & RX_CALL_TQ_BUSY) {
4190 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4191 call->flags |= (flags & RX_CALL_TQ_WAIT);
4193 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4195 rxi_ClearTransmitQueue(call, 0);
4196 queue_Init(&call->tq);
4199 queue_Init(&call->rq);
4201 call->rwind = rx_initReceiveWindow;
4202 call->twind = rx_initSendWindow;
4203 call->nSoftAcked = 0;
4204 call->nextCwind = 0;
4207 call->nCwindAcks = 0;
4208 call->nSoftAcks = 0;
4209 call->nHardAcks = 0;
4211 call->tfirst = call->rnext = call->tnext = 1;
4213 call->lastAcked = 0;
4214 call->localStatus = call->remoteStatus = 0;
4216 if (flags & RX_CALL_READER_WAIT) {
4217 #ifdef RX_ENABLE_LOCKS
4218 CV_BROADCAST(&call->cv_rq);
4220 osi_rxWakeup(&call->rq);
4223 if (flags & RX_CALL_WAIT_PACKETS) {
4224 MUTEX_ENTER(&rx_freePktQ_lock);
4225 rxi_PacketsUnWait(); /* XXX */
4226 MUTEX_EXIT(&rx_freePktQ_lock);
4229 #ifdef RX_ENABLE_LOCKS
4230 CV_SIGNAL(&call->cv_twind);
4232 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4233 osi_rxWakeup(&call->twind);
4236 #ifdef RX_ENABLE_LOCKS
4237 /* The following ensures that we don't mess with any queue while some
4238 * other thread might also be doing so. The call_queue_lock field is
4239 * is only modified under the call lock. If the call is in the process
4240 * of being removed from a queue, the call is not locked until the
4241 * the queue lock is dropped and only then is the call_queue_lock field
4242 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4243 * Note that any other routine which removes a call from a queue has to
4244 * obtain the queue lock before examing the queue and removing the call.
4246 if (call->call_queue_lock) {
4247 MUTEX_ENTER(call->call_queue_lock);
4248 if (queue_IsOnQueue(call)) {
4250 if (flags & RX_CALL_WAIT_PROC) {
4251 MUTEX_ENTER(&rx_stats_mutex);
4253 MUTEX_EXIT(&rx_stats_mutex);
4256 MUTEX_EXIT(call->call_queue_lock);
4257 CLEAR_CALL_QUEUE_LOCK(call);
4259 #else /* RX_ENABLE_LOCKS */
4260 if (queue_IsOnQueue(call)) {
4262 if (flags & RX_CALL_WAIT_PROC)
4265 #endif /* RX_ENABLE_LOCKS */
4267 rxi_KeepAliveOff(call);
4268 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4271 /* Send an acknowledge for the indicated packet (seq,serial) of the
4272 * indicated call, for the indicated reason (reason). This
4273 * acknowledge will specifically acknowledge receiving the packet, and
4274 * will also specify which other packets for this call have been
4275 * received. This routine returns the packet that was used to the
4276 * caller. The caller is responsible for freeing it or re-using it.
4277 * This acknowledgement also returns the highest sequence number
4278 * actually read out by the higher level to the sender; the sender
4279 * promises to keep around packets that have not been read by the
4280 * higher level yet (unless, of course, the sender decides to abort
4281 * the call altogether). Any of p, seq, serial, pflags, or reason may
4282 * be set to zero without ill effect. That is, if they are zero, they
4283 * will not convey any information.
4284 * NOW there is a trailer field, after the ack where it will safely be
4285 * ignored by mundanes, which indicates the maximum size packet this
4286 * host can swallow. */
4287 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4288 register struct rx_call *call;
4289 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4290 int seq; /* Sequence number of the packet we are acking */
4291 int serial; /* Serial number of the packet */
4292 int pflags; /* Flags field from packet header */
4293 int reason; /* Reason an acknowledge was prompted */
4296 struct rx_ackPacket *ap;
4297 register struct rx_packet *rqp;
4298 register struct rx_packet *nxp; /* For queue_Scan */
4299 register struct rx_packet *p;
4304 * Open the receive window once a thread starts reading packets
4306 if (call->rnext > 1) {
4307 call->rwind = rx_maxReceiveWindow;
4310 call->nHardAcks = 0;
4311 call->nSoftAcks = 0;
4312 if (call->rnext > call->lastAcked)
4313 call->lastAcked = call->rnext;
4317 rx_computelen(p, p->length); /* reset length, you never know */
4318 } /* where that's been... */
4320 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4321 /* We won't send the ack, but don't panic. */
4322 return optionalPacket;
4325 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4327 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4328 if (!optionalPacket) rxi_FreePacket(p);
4329 return optionalPacket;
4331 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4332 if (rx_Contiguous(p)<templ) {
4333 if (!optionalPacket) rxi_FreePacket(p);
4334 return optionalPacket;
4336 } /* MTUXXX failing to send an ack is very serious. We should */
4337 /* try as hard as possible to send even a partial ack; it's */
4338 /* better than nothing. */
4340 ap = (struct rx_ackPacket *) rx_DataOf(p);
4341 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4342 ap->reason = reason;
4344 /* The skew computation used to be bogus, I think it's better now. */
4345 /* We should start paying attention to skew. XXX */
4346 ap->serial = htonl(call->conn->maxSerial);
4347 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4349 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4350 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4352 /* No fear of running out of ack packet here because there can only be at most
4353 * one window full of unacknowledged packets. The window size must be constrained
4354 * to be less than the maximum ack size, of course. Also, an ack should always
4355 * fit into a single packet -- it should not ever be fragmented. */
4356 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4357 if (!rqp || !call->rq.next
4358 || (rqp->header.seq > (call->rnext + call->rwind))) {
4359 if (!optionalPacket) rxi_FreePacket(p);
4360 rxi_CallError(call, RX_CALL_DEAD);
4361 return optionalPacket;
4364 while (rqp->header.seq > call->rnext + offset)
4365 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4366 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4368 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4369 if (!optionalPacket) rxi_FreePacket(p);
4370 rxi_CallError(call, RX_CALL_DEAD);
4371 return optionalPacket;
4376 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4378 /* these are new for AFS 3.3 */
4379 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4380 templ = htonl(templ);
4381 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4382 templ = htonl(call->conn->peer->ifMTU);
4383 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4385 /* new for AFS 3.4 */
4386 templ = htonl(call->rwind);
4387 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4389 /* new for AFS 3.5 */
4390 templ = htonl(call->conn->peer->ifDgramPackets);
4391 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4393 p->header.serviceId = call->conn->serviceId;
4394 p->header.cid = (call->conn->cid | call->channel);
4395 p->header.callNumber = *call->callNumber;
4396 p->header.seq = seq;
4397 p->header.securityIndex = call->conn->securityIndex;
4398 p->header.epoch = call->conn->epoch;
4399 p->header.type = RX_PACKET_TYPE_ACK;
4400 p->header.flags = RX_SLOW_START_OK;
4401 if (reason == RX_ACK_PING) {
4402 p->header.flags |= RX_REQUEST_ACK;
4404 clock_GetTime(&call->pingRequestTime);
4407 if (call->conn->type == RX_CLIENT_CONNECTION)
4408 p->header.flags |= RX_CLIENT_INITIATED;
4412 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4413 ap->reason, ntohl(ap->previousPacket), p->header.seq,
4414 ntohl(ap->firstPacket));
4416 for (offset = 0; offset < ap->nAcks; offset++)
4417 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4424 register int i, nbytes = p->length;
4426 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4427 if (nbytes <= p->wirevec[i].iov_len) {
4428 register int savelen, saven;
4430 savelen = p->wirevec[i].iov_len;
4432 p->wirevec[i].iov_len = nbytes;
4434 rxi_Send(call, p, istack);
4435 p->wirevec[i].iov_len = savelen;
4439 else nbytes -= p->wirevec[i].iov_len;
4442 MUTEX_ENTER(&rx_stats_mutex);
4443 rx_stats.ackPacketsSent++;
4444 MUTEX_EXIT(&rx_stats_mutex);
4445 if (!optionalPacket) rxi_FreePacket(p);
4446 return optionalPacket; /* Return packet for re-use by caller */
4449 /* Send all of the packets in the list in single datagram */
4450 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime)
4451 struct rx_call *call;
4452 struct rx_packet **list;
4457 struct clock *retryTime;
4462 struct rx_connection *conn = call->conn;
4463 struct rx_peer *peer = conn->peer;
4465 MUTEX_ENTER(&peer->peer_lock);
4467 MUTEX_ENTER(&rx_stats_mutex);
4468 rx_stats.dataPacketsSent += len;
4469 MUTEX_EXIT(&rx_stats_mutex);
4470 MUTEX_EXIT(&peer->peer_lock);
4472 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4476 /* Set the packet flags and schedule the resend events */
4477 /* Only request an ack for the last packet in the list */
4478 for (i = 0 ; i < len ; i++) {
4479 list[i]->retryTime = *retryTime;
4480 if (list[i]->header.serial) {
4481 /* Exponentially backoff retry times */
4482 if (list[i]->backoff < MAXBACKOFF) {
4483 /* so it can't stay == 0 */
4484 list[i]->backoff = (list[i]->backoff << 1) +1;
4486 else list[i]->backoff++;
4487 clock_Addmsec(&(list[i]->retryTime),
4488 ((afs_uint32) list[i]->backoff) << 8);
4491 /* Wait a little extra for the ack on the last packet */
4492 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4493 clock_Addmsec(&(list[i]->retryTime), 400);
4496 /* Record the time sent */
4497 list[i]->timeSent = *now;
4499 /* Ask for an ack on retransmitted packets, on every other packet
4500 * if the peer doesn't support slow start. Ask for an ack on every
4501 * packet until the congestion window reaches the ack rate. */
4502 if (list[i]->header.serial) {
4504 MUTEX_ENTER(&rx_stats_mutex);
4505 rx_stats.dataPacketsReSent++;
4506 MUTEX_EXIT(&rx_stats_mutex);
4508 /* improved RTO calculation- not Karn */
4509 list[i]->firstSent = *now;
4511 && (call->cwind <= (u_short)(conn->ackRate+1)
4512 || (!(call->flags & RX_CALL_SLOW_START_OK)
4513 && (list[i]->header.seq & 1)))) {
4518 MUTEX_ENTER(&peer->peer_lock);
4520 MUTEX_ENTER(&rx_stats_mutex);
4521 rx_stats.dataPacketsSent++;
4522 MUTEX_EXIT(&rx_stats_mutex);
4523 MUTEX_EXIT(&peer->peer_lock);
4525 /* Tag this packet as not being the last in this group,
4526 * for the receiver's benefit */
4527 if (i < len-1 || moreFlag) {
4528 list[i]->header.flags |= RX_MORE_PACKETS;
4531 /* Install the new retransmit time for the packet, and
4532 * record the time sent */
4533 list[i]->timeSent = *now;
4537 list[len-1]->header.flags |= RX_REQUEST_ACK;
4540 /* Since we're about to send a data packet to the peer, it's
4541 * safe to nuke any scheduled end-of-packets ack */
4542 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4544 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4545 MUTEX_EXIT(&call->lock);
4547 rxi_SendPacketList(conn, list, len, istack);
4549 rxi_SendPacket(conn, list[0], istack);
4551 MUTEX_ENTER(&call->lock);
4552 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4554 /* Update last send time for this call (for keep-alive
4555 * processing), and for the connection (so that we can discover
4556 * idle connections) */
4557 conn->lastSendTime = call->lastSendTime = clock_Sec();
4560 /* When sending packets we need to follow these rules:
4561 * 1. Never send more than maxDgramPackets in a jumbogram.
4562 * 2. Never send a packet with more than two iovecs in a jumbogram.
4563 * 3. Never send a retransmitted packet in a jumbogram.
4564 * 4. Never send more than cwind/4 packets in a jumbogram
4565 * We always keep the last list we should have sent so we
4566 * can set the RX_MORE_PACKETS flags correctly.
4568 static void rxi_SendXmitList(call, list, len, istack, now, retryTime)
4569 struct rx_call *call;
4570 struct rx_packet **list;
4574 struct clock *retryTime;
4576 int i, cnt, lastCnt = 0;
4577 struct rx_packet **listP, **lastP = 0;
4578 struct rx_peer *peer = call->conn->peer;
4579 int morePackets = 0;
4581 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4582 /* Does the current packet force us to flush the current list? */
4584 && (list[i]->header.serial
4586 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4588 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime);
4589 /* If the call enters an error state stop sending, or if
4590 * we entered congestion recovery mode, stop sending */
4591 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4599 /* Add the current packet to the list if it hasn't been acked.
4600 * Otherwise adjust the list pointer to skip the current packet. */
4601 if (!list[i]->acked) {
4603 /* Do we need to flush the list? */
4604 if (cnt >= (int)peer->maxDgramPackets
4605 || cnt >= (int)call->nDgramPackets
4606 || cnt >= (int)call->cwind
4607 || list[i]->header.serial
4608 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4610 rxi_SendList(call, lastP, lastCnt, istack, 1,
4612 /* If the call enters an error state stop sending, or if
4613 * we entered congestion recovery mode, stop sending */
4614 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4624 osi_Panic("rxi_SendList error");
4630 /* Send the whole list when the call is in receive mode, when
4631 * the call is in eof mode, when we are in fast recovery mode,
4632 * and when we have the last packet */
4633 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4634 || call->mode == RX_MODE_RECEIVING
4635 || call->mode == RX_MODE_EOF
4636 || (call->flags & RX_CALL_FAST_RECOVER)) {
4637 /* Check for the case where the current list contains
4638 * an acked packet. Since we always send retransmissions
4639 * in a separate packet, we only need to check the first
4640 * packet in the list */
4641 if (cnt > 0 && !listP[0]->acked) {
4645 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4647 /* If the call enters an error state stop sending, or if
4648 * we entered congestion recovery mode, stop sending */
4649 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4653 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime);
4655 } else if (lastCnt > 0) {
4656 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime);
4660 #ifdef RX_ENABLE_LOCKS
4661 /* Call rxi_Start, below, but with the call lock held. */
4662 void rxi_StartUnlocked(event, call, istack)
4663 struct rxevent *event;
4664 register struct rx_call *call;
4667 MUTEX_ENTER(&call->lock);
4668 rxi_Start(event, call, istack);
4669 MUTEX_EXIT(&call->lock);
4671 #endif /* RX_ENABLE_LOCKS */
4673 /* This routine is called when new packets are readied for
4674 * transmission and when retransmission may be necessary, or when the
4675 * transmission window or burst count are favourable. This should be
4676 * better optimized for new packets, the usual case, now that we've
4677 * got rid of queues of send packets. XXXXXXXXXXX */
4678 void rxi_Start(event, call, istack)
4679 struct rxevent *event;
4680 register struct rx_call *call;
4683 struct rx_packet *p;
4684 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4685 struct rx_peer *peer = call->conn->peer;
4686 struct clock now, retryTime;
4690 struct rx_packet **xmitList;
4692 /* If rxi_Start is being called as a result of a resend event,
4693 * then make sure that the event pointer is removed from the call
4694 * structure, since there is no longer a per-call retransmission
4696 if (event && event == call->resendEvent) {
4697 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4698 call->resendEvent = NULL;
4699 if (queue_IsEmpty(&call->tq)) {
4703 /* Timeouts trigger congestion recovery */
4704 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4705 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4706 /* someone else is waiting to start recovery */
4709 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4710 while (call->flags & RX_CALL_TQ_BUSY) {
4711 call->flags |= RX_CALL_TQ_WAIT;
4712 #ifdef RX_ENABLE_LOCKS
4713 CV_WAIT(&call->cv_tq, &call->lock);
4714 #else /* RX_ENABLE_LOCKS */
4715 osi_rxSleep(&call->tq);
4716 #endif /* RX_ENABLE_LOCKS */
4718 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4719 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4720 call->flags |= RX_CALL_FAST_RECOVER;
4721 if (peer->maxDgramPackets > 1) {
4722 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4724 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4726 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4727 call->nDgramPackets = 1;
4729 call->nextCwind = 1;
4732 MUTEX_ENTER(&peer->peer_lock);
4733 peer->MTU = call->MTU;
4734 peer->cwind = call->cwind;
4735 peer->nDgramPackets = 1;
4737 call->congestSeq = peer->congestSeq;
4738 MUTEX_EXIT(&peer->peer_lock);
4739 /* Clear retry times on packets. Otherwise, it's possible for
4740 * some packets in the queue to force resends at rates faster
4741 * than recovery rates.
4743 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4745 clock_Zero(&p->retryTime);
4750 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4751 MUTEX_ENTER(&rx_stats_mutex);
4752 rx_tq_debug.rxi_start_in_error ++;
4753 MUTEX_EXIT(&rx_stats_mutex);
4758 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4759 /* Get clock to compute the re-transmit time for any packets
4760 * in this burst. Note, if we back off, it's reasonable to
4761 * back off all of the packets in the same manner, even if
4762 * some of them have been retransmitted more times than more
4763 * recent additions */
4764 clock_GetTime(&now);
4765 retryTime = now; /* initialize before use */
4766 MUTEX_ENTER(&peer->peer_lock);
4767 clock_Add(&retryTime, &peer->timeout);
4768 MUTEX_EXIT(&peer->peer_lock);
4770 /* Send (or resend) any packets that need it, subject to
4771 * window restrictions and congestion burst control
4772 * restrictions. Ask for an ack on the last packet sent in
4773 * this burst. For now, we're relying upon the window being
4774 * considerably bigger than the largest number of packets that
4775 * are typically sent at once by one initial call to
4776 * rxi_Start. This is probably bogus (perhaps we should ask
4777 * for an ack when we're half way through the current
4778 * window?). Also, for non file transfer applications, this
4779 * may end up asking for an ack for every packet. Bogus. XXXX
4782 * But check whether we're here recursively, and let the other guy
4785 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4786 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4787 call->flags |= RX_CALL_TQ_BUSY;
4789 call->flags &= ~RX_CALL_NEED_START;
4790 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4792 maxXmitPackets = MIN(call->twind, call->cwind);
4793 xmitList = (struct rx_packet **)
4794 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4795 if (xmitList == NULL)
4796 osi_Panic("rxi_Start, failed to allocate xmit list");
4797 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4798 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4799 /* We shouldn't be sending packets if a thread is waiting
4800 * to initiate congestion recovery */
4803 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4804 /* Only send one packet during fast recovery */
4807 if ((p->header.flags == RX_FREE_PACKET) ||
4808 (!queue_IsEnd(&call->tq, nxp)
4809 && (nxp->header.flags == RX_FREE_PACKET)) ||
4810 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4811 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4812 osi_Panic("rxi_Start: xmit queue clobbered");
4815 MUTEX_ENTER(&rx_stats_mutex);
4816 rx_stats.ignoreAckedPacket++;
4817 MUTEX_EXIT(&rx_stats_mutex);
4818 continue; /* Ignore this packet if it has been acknowledged */
4821 /* Turn off all flags except these ones, which are the same
4822 * on each transmission */
4823 p->header.flags &= RX_PRESET_FLAGS;
4825 if (p->header.seq >= call->tfirst +
4826 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4827 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4828 /* Note: if we're waiting for more window space, we can
4829 * still send retransmits; hence we don't return here, but
4830 * break out to schedule a retransmit event */
4831 dpf(("call %d waiting for window", *(call->callNumber)));
4835 /* Transmit the packet if it needs to be sent. */
4836 if (!clock_Lt(&now, &p->retryTime)) {
4837 if (nXmitPackets == maxXmitPackets) {
4838 osi_Panic("rxi_Start: xmit list overflowed");
4840 xmitList[nXmitPackets++] = p;
4844 /* xmitList now hold pointers to all of the packets that are
4845 * ready to send. Now we loop to send the packets */
4846 if (nXmitPackets > 0) {
4847 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4850 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4852 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4854 * TQ references no longer protected by this flag; they must remain
4855 * protected by the global lock.
4857 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4858 call->flags &= ~RX_CALL_TQ_BUSY;
4859 if (call->flags & RX_CALL_TQ_WAIT) {
4860 call->flags &= ~RX_CALL_TQ_WAIT;
4861 #ifdef RX_ENABLE_LOCKS
4862 CV_BROADCAST(&call->cv_tq);
4863 #else /* RX_ENABLE_LOCKS */
4864 osi_rxWakeup(&call->tq);
4865 #endif /* RX_ENABLE_LOCKS */
4870 /* We went into the error state while sending packets. Now is
4871 * the time to reset the call. This will also inform the using
4872 * process that the call is in an error state.
4874 MUTEX_ENTER(&rx_stats_mutex);
4875 rx_tq_debug.rxi_start_aborted ++;
4876 MUTEX_EXIT(&rx_stats_mutex);
4877 call->flags &= ~RX_CALL_TQ_BUSY;
4878 if (call->flags & RX_CALL_TQ_WAIT) {
4879 call->flags &= ~RX_CALL_TQ_WAIT;
4880 #ifdef RX_ENABLE_LOCKS
4881 CV_BROADCAST(&call->cv_tq);
4882 #else /* RX_ENABLE_LOCKS */
4883 osi_rxWakeup(&call->tq);
4884 #endif /* RX_ENABLE_LOCKS */
4886 rxi_CallError(call, call->error);
4889 #ifdef RX_ENABLE_LOCKS
4890 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4891 register int missing;
4892 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4893 /* Some packets have received acks. If they all have, we can clear
4894 * the transmit queue.
4896 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4897 if (p->header.seq < call->tfirst && p->acked) {
4905 call->flags |= RX_CALL_TQ_CLEARME;
4907 #endif /* RX_ENABLE_LOCKS */
4908 /* Don't bother doing retransmits if the TQ is cleared. */
4909 if (call->flags & RX_CALL_TQ_CLEARME) {
4910 rxi_ClearTransmitQueue(call, 1);
4912 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4915 /* Always post a resend event, if there is anything in the
4916 * queue, and resend is possible. There should be at least
4917 * one unacknowledged packet in the queue ... otherwise none
4918 * of these packets should be on the queue in the first place.
4920 if (call->resendEvent) {
4921 /* Cancel the existing event and post a new one */
4922 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4925 /* The retry time is the retry time on the first unacknowledged
4926 * packet inside the current window */
4927 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4928 /* Don't set timers for packets outside the window */
4929 if (p->header.seq >= call->tfirst + call->twind) {
4933 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4935 retryTime = p->retryTime;
4940 /* Post a new event to re-run rxi_Start when retries may be needed */
4941 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4942 #ifdef RX_ENABLE_LOCKS
4943 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4944 call->resendEvent = rxevent_Post(&retryTime,
4946 (char *)call, istack);
4947 #else /* RX_ENABLE_LOCKS */
4948 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4949 (char *)call, (void*)(long)istack);
4950 #endif /* RX_ENABLE_LOCKS */
4953 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4954 } while (call->flags & RX_CALL_NEED_START);
4956 * TQ references no longer protected by this flag; they must remain
4957 * protected by the global lock.
4959 call->flags &= ~RX_CALL_TQ_BUSY;
4960 if (call->flags & RX_CALL_TQ_WAIT) {
4961 call->flags &= ~RX_CALL_TQ_WAIT;
4962 #ifdef RX_ENABLE_LOCKS
4963 CV_BROADCAST(&call->cv_tq);
4964 #else /* RX_ENABLE_LOCKS */
4965 osi_rxWakeup(&call->tq);
4966 #endif /* RX_ENABLE_LOCKS */
4969 call->flags |= RX_CALL_NEED_START;
4971 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4973 if (call->resendEvent) {
4974 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4979 /* Also adjusts the keep alive parameters for the call, to reflect
4980 * that we have just sent a packet (so keep alives aren't sent
4982 void rxi_Send(call, p, istack)
4983 register struct rx_call *call;
4984 register struct rx_packet *p;
4987 register struct rx_connection *conn = call->conn;
4989 /* Stamp each packet with the user supplied status */
4990 p->header.userStatus = call->localStatus;
4992 /* Allow the security object controlling this call's security to
4993 * make any last-minute changes to the packet */
4994 RXS_SendPacket(conn->securityObject, call, p);
4996 /* Since we're about to send SOME sort of packet to the peer, it's
4997 * safe to nuke any scheduled end-of-packets ack */
4998 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5000 /* Actually send the packet, filling in more connection-specific fields */
5001 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5002 MUTEX_EXIT(&call->lock);
5003 rxi_SendPacket(conn, p, istack);
5004 MUTEX_ENTER(&call->lock);
5005 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5007 /* Update last send time for this call (for keep-alive
5008 * processing), and for the connection (so that we can discover
5009 * idle connections) */
5010 conn->lastSendTime = call->lastSendTime = clock_Sec();
5014 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5015 * that things are fine. Also called periodically to guarantee that nothing
5016 * falls through the cracks (e.g. (error + dally) connections have keepalive
5017 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5020 #ifdef RX_ENABLE_LOCKS
5021 int rxi_CheckCall(call, haveCTLock)
5022 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5023 #else /* RX_ENABLE_LOCKS */
5024 int rxi_CheckCall(call)
5025 #endif /* RX_ENABLE_LOCKS */
5026 register struct rx_call *call;
5028 register struct rx_connection *conn = call->conn;
5029 register struct rx_service *tservice;
5031 afs_uint32 deadTime;
5033 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5034 if (call->flags & RX_CALL_TQ_BUSY) {
5035 /* Call is active and will be reset by rxi_Start if it's
5036 * in an error state.
5041 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5042 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5043 ((afs_uint32)conn->peer->rtt >> 3) +
5044 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5046 /* These are computed to the second (+- 1 second). But that's
5047 * good enough for these values, which should be a significant
5048 * number of seconds. */
5049 if (now > (call->lastReceiveTime + deadTime)) {
5050 if (call->state == RX_STATE_ACTIVE) {
5051 rxi_CallError(call, RX_CALL_DEAD);
5055 #ifdef RX_ENABLE_LOCKS
5056 /* Cancel pending events */
5057 rxevent_Cancel(call->delayedAckEvent, call,
5058 RX_CALL_REFCOUNT_DELAY);
5059 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5060 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5061 if (call->refCount == 0) {
5062 rxi_FreeCall(call, haveCTLock);
5066 #else /* RX_ENABLE_LOCKS */
5069 #endif /* RX_ENABLE_LOCKS */
5071 /* Non-active calls are destroyed if they are not responding
5072 * to pings; active calls are simply flagged in error, so the
5073 * attached process can die reasonably gracefully. */
5075 /* see if we have a non-activity timeout */
5076 tservice = conn->service;
5077 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5078 && tservice->idleDeadTime
5079 && ((call->startWait + tservice->idleDeadTime) < now)) {
5080 if (call->state == RX_STATE_ACTIVE) {
5081 rxi_CallError(call, RX_CALL_TIMEOUT);
5085 /* see if we have a hard timeout */
5086 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5087 if (call->state == RX_STATE_ACTIVE)
5088 rxi_CallError(call, RX_CALL_TIMEOUT);
5095 /* When a call is in progress, this routine is called occasionally to
5096 * make sure that some traffic has arrived (or been sent to) the peer.
5097 * If nothing has arrived in a reasonable amount of time, the call is
5098 * declared dead; if nothing has been sent for a while, we send a
5099 * keep-alive packet (if we're actually trying to keep the call alive)
5101 void rxi_KeepAliveEvent(event, call, dummy)
5102 struct rxevent *event;
5103 register struct rx_call *call;
5105 struct rx_connection *conn;
5108 MUTEX_ENTER(&call->lock);
5109 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5110 if (event == call->keepAliveEvent)
5111 call->keepAliveEvent = (struct rxevent *) 0;
5114 #ifdef RX_ENABLE_LOCKS
5115 if(rxi_CheckCall(call, 0)) {
5116 MUTEX_EXIT(&call->lock);
5119 #else /* RX_ENABLE_LOCKS */
5120 if (rxi_CheckCall(call)) return;
5121 #endif /* RX_ENABLE_LOCKS */
5123 /* Don't try to keep alive dallying calls */
5124 if (call->state == RX_STATE_DALLY) {
5125 MUTEX_EXIT(&call->lock);
5130 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5131 /* Don't try to send keepalives if there is unacknowledged data */
5132 /* the rexmit code should be good enough, this little hack
5133 * doesn't quite work XXX */
5134 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5136 rxi_ScheduleKeepAliveEvent(call);
5137 MUTEX_EXIT(&call->lock);
5141 void rxi_ScheduleKeepAliveEvent(call)
5142 register struct rx_call *call;
5144 if (!call->keepAliveEvent) {
5146 clock_GetTime(&when);
5147 when.sec += call->conn->secondsUntilPing;
5148 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5149 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5153 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5154 void rxi_KeepAliveOn(call)
5155 register struct rx_call *call;
5157 /* Pretend last packet received was received now--i.e. if another
5158 * packet isn't received within the keep alive time, then the call
5159 * will die; Initialize last send time to the current time--even
5160 * if a packet hasn't been sent yet. This will guarantee that a
5161 * keep-alive is sent within the ping time */
5162 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5163 rxi_ScheduleKeepAliveEvent(call);
5166 /* This routine is called to send connection abort messages
5167 * that have been delayed to throttle looping clients. */
5168 void rxi_SendDelayedConnAbort(event, conn, dummy)
5169 struct rxevent *event;
5170 register struct rx_connection *conn;
5174 struct rx_packet *packet;
5176 MUTEX_ENTER(&conn->conn_data_lock);
5177 conn->delayedAbortEvent = (struct rxevent *) 0;
5178 error = htonl(conn->error);
5180 MUTEX_EXIT(&conn->conn_data_lock);
5181 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5183 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5184 RX_PACKET_TYPE_ABORT, (char *)&error,
5186 rxi_FreePacket(packet);
5190 /* This routine is called to send call abort messages
5191 * that have been delayed to throttle looping clients. */
5192 void rxi_SendDelayedCallAbort(event, call, dummy)
5193 struct rxevent *event;
5194 register struct rx_call *call;
5198 struct rx_packet *packet;
5200 MUTEX_ENTER(&call->lock);
5201 call->delayedAbortEvent = (struct rxevent *) 0;
5202 error = htonl(call->error);
5204 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5206 packet = rxi_SendSpecial(call, call->conn, packet,
5207 RX_PACKET_TYPE_ABORT, (char *)&error,
5209 rxi_FreePacket(packet);
5211 MUTEX_EXIT(&call->lock);
5214 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5215 * seconds) to ask the client to authenticate itself. The routine
5216 * issues a challenge to the client, which is obtained from the
5217 * security object associated with the connection */
5218 void rxi_ChallengeEvent(event, conn, dummy)
5219 struct rxevent *event;
5220 register struct rx_connection *conn;
5223 conn->challengeEvent = (struct rxevent *) 0;
5224 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5225 register struct rx_packet *packet;
5227 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5229 /* If there's no packet available, do this later. */
5230 RXS_GetChallenge(conn->securityObject, conn, packet);
5231 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5232 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5233 rxi_FreePacket(packet);
5235 clock_GetTime(&when);
5236 when.sec += RX_CHALLENGE_TIMEOUT;
5237 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5241 /* Call this routine to start requesting the client to authenticate
5242 * itself. This will continue until authentication is established,
5243 * the call times out, or an invalid response is returned. The
5244 * security object associated with the connection is asked to create
5245 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5246 * defined earlier. */
5247 void rxi_ChallengeOn(conn)
5248 register struct rx_connection *conn;
5250 if (!conn->challengeEvent) {
5251 RXS_CreateChallenge(conn->securityObject, conn);
5252 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5257 /* Compute round trip time of the packet provided, in *rttp.
5260 /* rxi_ComputeRoundTripTime is called with peer locked. */
5261 void rxi_ComputeRoundTripTime(p, sentp, peer)
5262 register struct clock *sentp; /* may be null */
5263 register struct rx_peer *peer; /* may be null */
5264 register struct rx_packet *p;
5266 struct clock thisRtt, *rttp = &thisRtt;
5268 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5269 /* making year 2038 bugs to get this running now - stroucki */
5270 struct timeval temptime;
5272 register int rtt_timeout;
5273 static char id[]="@(#)adaptive RTO";
5275 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5276 /* yet again. This was the worst Heisenbug of the port - stroucki */
5277 clock_GetTime(&temptime);
5278 rttp->sec=(afs_int32)temptime.tv_sec;
5279 rttp->usec=(afs_int32)temptime.tv_usec;
5281 clock_GetTime(rttp);
5283 if (clock_Lt(rttp, sentp)) {
5285 return; /* somebody set the clock back, don't count this time. */
5287 clock_Sub(rttp, sentp);
5288 MUTEX_ENTER(&rx_stats_mutex);
5289 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5290 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5291 if (rttp->sec > 60) {
5292 MUTEX_EXIT(&rx_stats_mutex);
5293 return; /* somebody set the clock ahead */
5295 rx_stats.maxRtt = *rttp;
5297 clock_Add(&rx_stats.totalRtt, rttp);
5298 rx_stats.nRttSamples++;
5299 MUTEX_EXIT(&rx_stats_mutex);
5301 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5303 /* Apply VanJacobson round-trip estimations */
5308 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5309 * srtt is stored as fixed point with 3 bits after the binary
5310 * point (i.e., scaled by 8). The following magic is
5311 * equivalent to the smoothing algorithm in rfc793 with an
5312 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5313 * srtt*8 = srtt*8 + rtt - srtt
5314 * srtt = srtt + rtt/8 - srtt/8
5317 delta = MSEC(rttp) - (peer->rtt >> 3);
5321 * We accumulate a smoothed rtt variance (actually, a smoothed
5322 * mean difference), then set the retransmit timer to smoothed
5323 * rtt + 4 times the smoothed variance (was 2x in van's original
5324 * paper, but 4x works better for me, and apparently for him as
5326 * rttvar is stored as
5327 * fixed point with 2 bits after the binary point (scaled by
5328 * 4). The following is equivalent to rfc793 smoothing with
5329 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5330 * replaces rfc793's wired-in beta.
5331 * dev*4 = dev*4 + (|actual - expected| - dev)
5337 delta -= (peer->rtt_dev >> 2);
5338 peer->rtt_dev += delta;
5341 /* I don't have a stored RTT so I start with this value. Since I'm
5342 * probably just starting a call, and will be pushing more data down
5343 * this, I expect congestion to increase rapidly. So I fudge a
5344 * little, and I set deviance to half the rtt. In practice,
5345 * deviance tends to approach something a little less than
5346 * half the smoothed rtt. */
5347 peer->rtt = (MSEC(rttp) << 3) + 8;
5348 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5350 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5351 * the other of these connections is usually in a user process, and can
5352 * be switched and/or swapped out. So on fast, reliable networks, the
5353 * timeout would otherwise be too short.
5355 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5356 clock_Zero(&(peer->timeout));
5357 clock_Addmsec(&(peer->timeout), rtt_timeout);
5359 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5360 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5361 (peer->timeout.sec),(peer->timeout.usec)) );
5365 /* Find all server connections that have not been active for a long time, and
5367 void rxi_ReapConnections()
5370 clock_GetTime(&now);
5372 /* Find server connection structures that haven't been used for
5373 * greater than rx_idleConnectionTime */
5374 { struct rx_connection **conn_ptr, **conn_end;
5375 int i, havecalls = 0;
5376 MUTEX_ENTER(&rx_connHashTable_lock);
5377 for (conn_ptr = &rx_connHashTable[0],
5378 conn_end = &rx_connHashTable[rx_hashTableSize];
5379 conn_ptr < conn_end; conn_ptr++) {
5380 struct rx_connection *conn, *next;
5381 struct rx_call *call;
5385 for (conn = *conn_ptr; conn; conn = next) {
5386 /* XXX -- Shouldn't the connection be locked? */
5389 for(i=0;i<RX_MAXCALLS;i++) {
5390 call = conn->call[i];
5393 MUTEX_ENTER(&call->lock);
5394 #ifdef RX_ENABLE_LOCKS
5395 result = rxi_CheckCall(call, 1);
5396 #else /* RX_ENABLE_LOCKS */
5397 result = rxi_CheckCall(call);
5398 #endif /* RX_ENABLE_LOCKS */
5399 MUTEX_EXIT(&call->lock);
5401 /* If CheckCall freed the call, it might
5402 * have destroyed the connection as well,
5403 * which screws up the linked lists.
5409 if (conn->type == RX_SERVER_CONNECTION) {
5410 /* This only actually destroys the connection if
5411 * there are no outstanding calls */
5412 MUTEX_ENTER(&conn->conn_data_lock);
5413 if (!havecalls && !conn->refCount &&
5414 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5415 conn->refCount++; /* it will be decr in rx_DestroyConn */
5416 MUTEX_EXIT(&conn->conn_data_lock);
5417 #ifdef RX_ENABLE_LOCKS
5418 rxi_DestroyConnectionNoLock(conn);
5419 #else /* RX_ENABLE_LOCKS */
5420 rxi_DestroyConnection(conn);
5421 #endif /* RX_ENABLE_LOCKS */
5423 #ifdef RX_ENABLE_LOCKS
5425 MUTEX_EXIT(&conn->conn_data_lock);
5427 #endif /* RX_ENABLE_LOCKS */
5431 #ifdef RX_ENABLE_LOCKS
5432 while (rx_connCleanup_list) {
5433 struct rx_connection *conn;
5434 conn = rx_connCleanup_list;
5435 rx_connCleanup_list = rx_connCleanup_list->next;
5436 MUTEX_EXIT(&rx_connHashTable_lock);
5437 rxi_CleanupConnection(conn);
5438 MUTEX_ENTER(&rx_connHashTable_lock);
5440 MUTEX_EXIT(&rx_connHashTable_lock);
5441 #endif /* RX_ENABLE_LOCKS */
5444 /* Find any peer structures that haven't been used (haven't had an
5445 * associated connection) for greater than rx_idlePeerTime */
5446 { struct rx_peer **peer_ptr, **peer_end;
5448 MUTEX_ENTER(&rx_rpc_stats);
5449 MUTEX_ENTER(&rx_peerHashTable_lock);
5450 for (peer_ptr = &rx_peerHashTable[0],
5451 peer_end = &rx_peerHashTable[rx_hashTableSize];
5452 peer_ptr < peer_end; peer_ptr++) {
5453 struct rx_peer *peer, *next, *prev;
5454 for (prev = peer = *peer_ptr; peer; peer = next) {
5456 code = MUTEX_TRYENTER(&peer->peer_lock);
5457 if ((code) && (peer->refCount == 0)
5458 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5459 rx_interface_stat_p rpc_stat, nrpc_stat;
5461 MUTEX_EXIT(&peer->peer_lock);
5462 MUTEX_DESTROY(&peer->peer_lock);
5463 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5464 rx_interface_stat)) {
5465 unsigned int num_funcs;
5466 if (!rpc_stat) break;
5467 queue_Remove(&rpc_stat->queue_header);
5468 queue_Remove(&rpc_stat->all_peers);
5469 num_funcs = rpc_stat->stats[0].func_total;
5470 space = sizeof(rx_interface_stat_t) +
5471 rpc_stat->stats[0].func_total *
5472 sizeof(rx_function_entry_v1_t);
5474 rxi_Free(rpc_stat, space);
5475 rxi_rpc_peer_stat_cnt -= num_funcs;
5478 MUTEX_ENTER(&rx_stats_mutex);
5479 rx_stats.nPeerStructs--;
5480 MUTEX_EXIT(&rx_stats_mutex);
5481 if (prev == *peer_ptr) {
5490 MUTEX_EXIT(&peer->peer_lock);
5496 MUTEX_EXIT(&rx_peerHashTable_lock);
5497 MUTEX_EXIT(&rx_rpc_stats);
5500 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5501 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5502 GC, just below. Really, we shouldn't have to keep moving packets from
5503 one place to another, but instead ought to always know if we can
5504 afford to hold onto a packet in its particular use. */
5505 MUTEX_ENTER(&rx_freePktQ_lock);
5506 if (rx_waitingForPackets) {
5507 rx_waitingForPackets = 0;
5508 #ifdef RX_ENABLE_LOCKS
5509 CV_BROADCAST(&rx_waitingForPackets_cv);
5511 osi_rxWakeup(&rx_waitingForPackets);
5514 MUTEX_EXIT(&rx_freePktQ_lock);
5516 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5517 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5521 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5522 * rx.h is sort of strange this is better. This is called with a security
5523 * object before it is discarded. Each connection using a security object has
5524 * its own refcount to the object so it won't actually be freed until the last
5525 * connection is destroyed.
5527 * This is the only rxs module call. A hold could also be written but no one
5530 int rxs_Release (aobj)
5531 struct rx_securityClass *aobj;
5533 return RXS_Close (aobj);
5537 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5538 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5539 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5540 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5542 /* Adjust our estimate of the transmission rate to this peer, given
5543 * that the packet p was just acked. We can adjust peer->timeout and
5544 * call->twind. Pragmatically, this is called
5545 * only with packets of maximal length.
5546 * Called with peer and call locked.
5549 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5550 register struct rx_peer *peer;
5551 register struct rx_call *call;
5552 struct rx_packet *p, *ackp;
5555 afs_int32 xferSize, xferMs;
5556 register afs_int32 minTime;
5559 /* Count down packets */
5560 if (peer->rateFlag > 0) peer->rateFlag--;
5561 /* Do nothing until we're enabled */
5562 if (peer->rateFlag != 0) return;
5563 if (!call->conn) return;
5565 /* Count only when the ack seems legitimate */
5566 switch (ackReason) {
5567 case RX_ACK_REQUESTED:
5568 xferSize = p->length + RX_HEADER_SIZE +
5569 call->conn->securityMaxTrailerSize;
5573 case RX_ACK_PING_RESPONSE:
5574 if (p) /* want the response to ping-request, not data send */
5576 clock_GetTime(&newTO);
5577 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5578 clock_Sub(&newTO, &call->pingRequestTime);
5579 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5583 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5590 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5591 ntohl(peer->host), ntohs(peer->port),
5592 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5593 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5596 /* Track only packets that are big enough. */
5597 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5601 /* absorb RTT data (in milliseconds) for these big packets */
5602 if (peer->smRtt == 0) {
5603 peer->smRtt = xferMs;
5605 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5606 if (!peer->smRtt) peer->smRtt = 1;
5609 if (peer->countDown) {
5613 peer->countDown = 10; /* recalculate only every so often */
5615 /* In practice, we can measure only the RTT for full packets,
5616 * because of the way Rx acks the data that it receives. (If it's
5617 * smaller than a full packet, it often gets implicitly acked
5618 * either by the call response (from a server) or by the next call
5619 * (from a client), and either case confuses transmission times
5620 * with processing times.) Therefore, replace the above
5621 * more-sophisticated processing with a simpler version, where the
5622 * smoothed RTT is kept for full-size packets, and the time to
5623 * transmit a windowful of full-size packets is simply RTT *
5624 * windowSize. Again, we take two steps:
5625 - ensure the timeout is large enough for a single packet's RTT;
5626 - ensure that the window is small enough to fit in the desired timeout.*/
5628 /* First, the timeout check. */
5629 minTime = peer->smRtt;
5630 /* Get a reasonable estimate for a timeout period */
5632 newTO.sec = minTime / 1000;
5633 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5635 /* Increase the timeout period so that we can always do at least
5636 * one packet exchange */
5637 if (clock_Gt(&newTO, &peer->timeout)) {
5639 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5640 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5641 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5644 peer->timeout = newTO;
5647 /* Now, get an estimate for the transmit window size. */
5648 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5649 /* Now, convert to the number of full packets that could fit in a
5650 * reasonable fraction of that interval */
5651 minTime /= (peer->smRtt << 1);
5652 xferSize = minTime; /* (make a copy) */
5654 /* Now clamp the size to reasonable bounds. */
5655 if (minTime <= 1) minTime = 1;
5656 else if (minTime > rx_Window) minTime = rx_Window;
5657 /* if (minTime != peer->maxWindow) {
5658 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5659 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5660 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5662 peer->maxWindow = minTime;
5663 elide... call->twind = minTime;
5667 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5668 * Discern this by calculating the timeout necessary for rx_Window
5670 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5671 /* calculate estimate for transmission interval in milliseconds */
5672 minTime = rx_Window * peer->smRtt;
5673 if (minTime < 1000) {
5674 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5675 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5676 peer->timeout.usec, peer->smRtt,
5679 newTO.sec = 0; /* cut back on timeout by half a second */
5680 newTO.usec = 500000;
5681 clock_Sub(&peer->timeout, &newTO);
5686 } /* end of rxi_ComputeRate */
5687 #endif /* ADAPT_WINDOW */
5695 /* Don't call this debugging routine directly; use dpf */
5697 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5698 a11, a12, a13, a14, a15)
5702 clock_GetTime(&now);
5703 fprintf(rx_Log, " %u.%.3u:", now.sec, now.usec/1000);
5704 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5711 * This function is used to process the rx_stats structure that is local
5712 * to a process as well as an rx_stats structure received from a remote
5713 * process (via rxdebug). Therefore, it needs to do minimal version
5716 void rx_PrintTheseStats (file, s, size, freePackets, version)
5719 int size; /* some idea of version control */
5720 afs_int32 freePackets;
5725 if (size != sizeof(struct rx_stats)) {
5727 "Unexpected size of stats structure: was %d, expected %d\n",
5728 size, sizeof(struct rx_stats));
5732 "rx stats: free packets %d, "
5737 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5739 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5740 s->receivePktAllocFailures,
5741 s->receiveCbufPktAllocFailures,
5742 s->sendPktAllocFailures,
5743 s->sendCbufPktAllocFailures,
5744 s->specialPktAllocFailures);
5747 "alloc-failures(rcv %d,send %d,ack %d)\n",
5748 s->receivePktAllocFailures,
5749 s->sendPktAllocFailures,
5750 s->specialPktAllocFailures);
5755 "bogusReads %d (last from host %x), "
5761 s->bogusPacketOnRead,
5764 s->noPacketBuffersOnRead,
5768 fprintf(file, " packets read: ");
5769 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5775 fprintf(file, "\n");
5778 " other read counters: data %d, "
5786 s->spuriousPacketsRead,
5787 s->ignorePacketDally);
5789 fprintf(file, " packets sent: ");
5790 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5796 fprintf(file, "\n");
5799 " other send counters: ack %d, "
5800 "data %d (not resends), "
5803 "acked&ignored %d\n",
5806 s->dataPacketsReSent,
5807 s->dataPacketsPushed,
5808 s->ignoreAckedPacket);
5811 " \t(these should be small) sendFailed %d, "
5816 if (s->nRttSamples) {
5818 " Average rtt is %0.3f, with %d samples\n",
5819 clock_Float(&s->totalRtt)/s->nRttSamples,
5823 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5824 clock_Float(&s->minRtt),
5825 clock_Float(&s->maxRtt));
5829 " %d server connections, "
5830 "%d client connections, "
5833 "%d free call structs\n",
5838 s->nFreeCallStructs);
5840 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5842 " %d clock updates\n",
5848 /* for backward compatibility */
5849 void rx_PrintStats(file)
5852 MUTEX_ENTER(&rx_stats_mutex);
5853 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5854 MUTEX_EXIT(&rx_stats_mutex);
5857 void rx_PrintPeerStats(file, peer)
5859 struct rx_peer *peer;
5864 "burst wait %u.%d.\n",
5868 peer->burstWait.sec,
5869 peer->burstWait.usec);
5873 "retry time %u.%06d, "
5884 "max in packet skew %d, "
5885 "max out packet skew %d\n",
5888 peer->outPacketSkew);
5891 #ifdef AFS_PTHREAD_ENV
5893 * This mutex protects the following static variables:
5897 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5898 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5900 #define LOCK_RX_DEBUG
5901 #define UNLOCK_RX_DEBUG
5902 #endif /* AFS_PTHREAD_ENV */
5904 static int MakeDebugCall(
5906 afs_uint32 remoteAddr,
5907 afs_uint16 remotePort,
5915 static afs_int32 counter = 100;
5917 struct rx_header theader;
5919 register afs_int32 code;
5921 struct sockaddr_in taddr, faddr;
5926 endTime = time(0) + 20; /* try for 20 seconds */
5930 tp = &tbuffer[sizeof(struct rx_header)];
5931 taddr.sin_family = AF_INET;
5932 taddr.sin_port = remotePort;
5933 taddr.sin_addr.s_addr = remoteAddr;
5935 memset(&theader, 0, sizeof(theader));
5936 theader.epoch = htonl(999);
5938 theader.callNumber = htonl(counter);
5941 theader.type = type;
5942 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5943 theader.serviceId = 0;
5945 bcopy(&theader, tbuffer, sizeof(theader));
5946 bcopy(inputData, tp, inputLength);
5947 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5948 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5950 /* see if there's a packet available */
5952 FD_SET(socket, &imask);
5955 code = select(socket+1, &imask, 0, 0, &tv);
5957 /* now receive a packet */
5958 faddrLen = sizeof(struct sockaddr_in);
5959 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
5960 (struct sockaddr *) &faddr, &faddrLen);
5962 bcopy(tbuffer, &theader, sizeof(struct rx_header));
5963 if (counter == ntohl(theader.callNumber)) break;
5966 /* see if we've timed out */
5967 if (endTime < time(0)) return -1;
5969 code -= sizeof(struct rx_header);
5970 if (code > outputLength) code = outputLength;
5971 bcopy(tp, outputData, code);
5975 afs_int32 rx_GetServerDebug(
5977 afs_uint32 remoteAddr,
5978 afs_uint16 remotePort,
5979 struct rx_debugStats *stat,
5980 afs_uint32 *supportedValues
5983 struct rx_debugIn in;
5986 *supportedValues = 0;
5987 in.type = htonl(RX_DEBUGI_GETSTATS);
5990 rc = MakeDebugCall(socket,
5993 RX_PACKET_TYPE_DEBUG,
6000 * If the call was successful, fixup the version and indicate
6001 * what contents of the stat structure are valid.
6002 * Also do net to host conversion of fields here.
6006 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6007 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6009 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6010 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6012 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6013 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6015 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6016 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6018 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6019 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6021 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6022 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6024 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6025 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6028 stat->nFreePackets = ntohl(stat->nFreePackets);
6029 stat->packetReclaims = ntohl(stat->packetReclaims);
6030 stat->callsExecuted = ntohl(stat->callsExecuted);
6031 stat->nWaiting = ntohl(stat->nWaiting);
6032 stat->idleThreads = ntohl(stat->idleThreads);
6038 afs_int32 rx_GetServerStats(
6040 afs_uint32 remoteAddr,
6041 afs_uint16 remotePort,
6042 struct rx_stats *stat,
6043 afs_uint32 *supportedValues
6046 struct rx_debugIn in;
6047 afs_int32 *lp = (afs_int32 *) stat;
6052 * supportedValues is currently unused, but added to allow future
6053 * versioning of this function.
6056 *supportedValues = 0;
6057 in.type = htonl(RX_DEBUGI_RXSTATS);
6059 memset(stat, 0, sizeof(*stat));
6061 rc = MakeDebugCall(socket,
6064 RX_PACKET_TYPE_DEBUG,
6073 * Do net to host conversion here
6076 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6084 afs_int32 rx_GetServerVersion(
6086 afs_uint32 remoteAddr,
6087 afs_uint16 remotePort,
6088 size_t version_length,
6093 return MakeDebugCall(socket,
6096 RX_PACKET_TYPE_VERSION,
6103 afs_int32 rx_GetServerConnections(
6105 afs_uint32 remoteAddr,
6106 afs_uint16 remotePort,
6107 afs_int32 *nextConnection,
6109 afs_uint32 debugSupportedValues,
6110 struct rx_debugConn *conn,
6111 afs_uint32 *supportedValues
6114 struct rx_debugIn in;
6119 * supportedValues is currently unused, but added to allow future
6120 * versioning of this function.
6123 *supportedValues = 0;
6124 if (allConnections) {
6125 in.type = htonl(RX_DEBUGI_GETALLCONN);
6127 in.type = htonl(RX_DEBUGI_GETCONN);
6129 in.index = htonl(*nextConnection);
6130 memset(conn, 0, sizeof(*conn));
6132 rc = MakeDebugCall(socket,
6135 RX_PACKET_TYPE_DEBUG,
6142 *nextConnection += 1;
6145 * Convert old connection format to new structure.
6148 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6149 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6150 #define MOVEvL(a) (conn->a = vL->a)
6152 /* any old or unrecognized version... */
6153 for (i=0;i<RX_MAXCALLS;i++) {
6154 MOVEvL(callState[i]);
6155 MOVEvL(callMode[i]);
6156 MOVEvL(callFlags[i]);
6157 MOVEvL(callOther[i]);
6159 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6160 MOVEvL(secStats.type);
6161 MOVEvL(secStats.level);
6162 MOVEvL(secStats.flags);
6163 MOVEvL(secStats.expires);
6164 MOVEvL(secStats.packetsReceived);
6165 MOVEvL(secStats.packetsSent);
6166 MOVEvL(secStats.bytesReceived);
6167 MOVEvL(secStats.bytesSent);
6172 * Do net to host conversion here
6174 * I don't convert host or port since we are most likely
6175 * going to want these in NBO.
6177 conn->cid = ntohl(conn->cid);
6178 conn->serial = ntohl(conn->serial);
6179 for(i=0;i<RX_MAXCALLS;i++) {
6180 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6182 conn->error = ntohl(conn->error);
6183 conn->secStats.flags = ntohl(conn->secStats.flags);
6184 conn->secStats.expires = ntohl(conn->secStats.expires);
6185 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6186 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6187 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6188 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6189 conn->epoch = ntohl(conn->epoch);
6190 conn->natMTU = ntohl(conn->natMTU);
6196 afs_int32 rx_GetServerPeers(
6198 afs_uint32 remoteAddr,
6199 afs_uint16 remotePort,
6200 afs_int32 *nextPeer,
6201 afs_uint32 debugSupportedValues,
6202 struct rx_debugPeer *peer,
6203 afs_uint32 *supportedValues
6206 struct rx_debugIn in;
6211 * supportedValues is currently unused, but added to allow future
6212 * versioning of this function.
6215 *supportedValues = 0;
6216 in.type = htonl(RX_DEBUGI_GETPEER);
6217 in.index = htonl(*nextPeer);
6218 memset(peer, 0, sizeof(*peer));
6220 rc = MakeDebugCall(socket,
6223 RX_PACKET_TYPE_DEBUG,
6233 * Do net to host conversion here
6235 * I don't convert host or port since we are most likely
6236 * going to want these in NBO.
6238 peer->ifMTU = ntohs(peer->ifMTU);
6239 peer->idleWhen = ntohl(peer->idleWhen);
6240 peer->refCount = ntohs(peer->refCount);
6241 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6242 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6243 peer->rtt = ntohl(peer->rtt);
6244 peer->rtt_dev = ntohl(peer->rtt_dev);
6245 peer->timeout.sec = ntohl(peer->timeout.sec);
6246 peer->timeout.usec = ntohl(peer->timeout.usec);
6247 peer->nSent = ntohl(peer->nSent);
6248 peer->reSends = ntohl(peer->reSends);
6249 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6250 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6251 peer->rateFlag = ntohl(peer->rateFlag);
6252 peer->natMTU = ntohs(peer->natMTU);
6253 peer->maxMTU = ntohs(peer->maxMTU);
6254 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6255 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6256 peer->MTU = ntohs(peer->MTU);
6257 peer->cwind = ntohs(peer->cwind);
6258 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6259 peer->congestSeq = ntohs(peer->congestSeq);
6260 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6261 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6262 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6263 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6268 #endif /* RXDEBUG */
6270 void shutdown_rx(void)
6272 struct rx_serverQueueEntry *np;
6274 register struct rx_call *call;
6275 register struct rx_serverQueueEntry *sq;
6278 if (rxinit_status == 1) {
6280 return; /* Already shutdown. */
6285 #ifndef AFS_PTHREAD_ENV
6286 FD_ZERO(&rx_selectMask);
6287 #endif /* AFS_PTHREAD_ENV */
6288 rxi_dataQuota = RX_MAX_QUOTA;
6289 #ifndef AFS_PTHREAD_ENV
6291 #endif /* AFS_PTHREAD_ENV */
6294 #ifndef AFS_PTHREAD_ENV
6295 #ifndef AFS_USE_GETTIMEOFDAY
6297 #endif /* AFS_USE_GETTIMEOFDAY */
6298 #endif /* AFS_PTHREAD_ENV */
6300 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6301 call = queue_First(&rx_freeCallQueue, rx_call);
6303 rxi_Free(call, sizeof(struct rx_call));
6306 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6307 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6313 struct rx_peer **peer_ptr, **peer_end;
6314 for (peer_ptr = &rx_peerHashTable[0],
6315 peer_end = &rx_peerHashTable[rx_hashTableSize];
6316 peer_ptr < peer_end; peer_ptr++) {
6317 struct rx_peer *peer, *next;
6318 for (peer = *peer_ptr; peer; peer = next) {
6319 rx_interface_stat_p rpc_stat, nrpc_stat;
6321 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6322 rx_interface_stat)) {
6323 unsigned int num_funcs;
6324 if (!rpc_stat) break;
6325 queue_Remove(&rpc_stat->queue_header);
6326 queue_Remove(&rpc_stat->all_peers);
6327 num_funcs = rpc_stat->stats[0].func_total;
6328 space = sizeof(rx_interface_stat_t) +
6329 rpc_stat->stats[0].func_total *
6330 sizeof(rx_function_entry_v1_t);
6332 rxi_Free(rpc_stat, space);
6333 MUTEX_ENTER(&rx_rpc_stats);
6334 rxi_rpc_peer_stat_cnt -= num_funcs;
6335 MUTEX_EXIT(&rx_rpc_stats);
6339 MUTEX_ENTER(&rx_stats_mutex);
6340 rx_stats.nPeerStructs--;
6341 MUTEX_EXIT(&rx_stats_mutex);
6345 for (i = 0; i<RX_MAX_SERVICES; i++) {
6347 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6349 for (i = 0; i < rx_hashTableSize; i++) {
6350 register struct rx_connection *tc, *ntc;
6351 MUTEX_ENTER(&rx_connHashTable_lock);
6352 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6354 for (j = 0; j < RX_MAXCALLS; j++) {
6356 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6359 rxi_Free(tc, sizeof(*tc));
6361 MUTEX_EXIT(&rx_connHashTable_lock);
6364 MUTEX_ENTER(&freeSQEList_lock);
6366 while (np = rx_FreeSQEList) {
6367 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6368 MUTEX_DESTROY(&np->lock);
6369 rxi_Free(np, sizeof(*np));
6372 MUTEX_EXIT(&freeSQEList_lock);
6373 MUTEX_DESTROY(&freeSQEList_lock);
6374 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6375 MUTEX_DESTROY(&rx_connHashTable_lock);
6376 MUTEX_DESTROY(&rx_peerHashTable_lock);
6377 MUTEX_DESTROY(&rx_serverPool_lock);
6379 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6380 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6382 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6383 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6385 rxi_FreeAllPackets();
6387 MUTEX_ENTER(&rx_stats_mutex);
6388 rxi_dataQuota = RX_MAX_QUOTA;
6389 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6390 MUTEX_EXIT(&rx_stats_mutex);
6396 #ifdef RX_ENABLE_LOCKS
6397 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6399 if (!MUTEX_ISMINE(lockaddr))
6400 osi_Panic("Lock not held: %s", msg);
6402 #endif /* RX_ENABLE_LOCKS */
6407 * Routines to implement connection specific data.
6410 int rx_KeyCreate(rx_destructor_t rtn)
6413 MUTEX_ENTER(&rxi_keyCreate_lock);
6414 key = rxi_keyCreate_counter++;
6415 rxi_keyCreate_destructor = (rx_destructor_t *)
6416 realloc((void *)rxi_keyCreate_destructor,
6417 (key+1) * sizeof(rx_destructor_t));
6418 rxi_keyCreate_destructor[key] = rtn;
6419 MUTEX_EXIT(&rxi_keyCreate_lock);
6423 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6426 MUTEX_ENTER(&conn->conn_data_lock);
6427 if (!conn->specific) {
6428 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6429 for (i = 0 ; i < key ; i++)
6430 conn->specific[i] = NULL;
6431 conn->nSpecific = key+1;
6432 conn->specific[key] = ptr;
6433 } else if (key >= conn->nSpecific) {
6434 conn->specific = (void **)
6435 realloc(conn->specific,(key+1)*sizeof(void *));
6436 for (i = conn->nSpecific ; i < key ; i++)
6437 conn->specific[i] = NULL;
6438 conn->nSpecific = key+1;
6439 conn->specific[key] = ptr;
6441 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6442 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6443 conn->specific[key] = ptr;
6445 MUTEX_EXIT(&conn->conn_data_lock);
6448 void *rx_GetSpecific(struct rx_connection *conn, int key)
6451 MUTEX_ENTER(&conn->conn_data_lock);
6452 if (key >= conn->nSpecific)
6455 ptr = conn->specific[key];
6456 MUTEX_EXIT(&conn->conn_data_lock);
6460 #endif /* !KERNEL */
6463 * processStats is a queue used to store the statistics for the local
6464 * process. Its contents are similar to the contents of the rpcStats
6465 * queue on a rx_peer structure, but the actual data stored within
6466 * this queue contains totals across the lifetime of the process (assuming
6467 * the stats have not been reset) - unlike the per peer structures
6468 * which can come and go based upon the peer lifetime.
6471 static struct rx_queue processStats = {&processStats,&processStats};
6474 * peerStats is a queue used to store the statistics for all peer structs.
6475 * Its contents are the union of all the peer rpcStats queues.
6478 static struct rx_queue peerStats = {&peerStats,&peerStats};
6481 * rxi_monitor_processStats is used to turn process wide stat collection
6485 static int rxi_monitor_processStats = 0;
6488 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6491 static int rxi_monitor_peerStats = 0;
6494 * rxi_AddRpcStat - given all of the information for a particular rpc
6495 * call, create (if needed) and update the stat totals for the rpc.
6499 * IN stats - the queue of stats that will be updated with the new value
6501 * IN rxInterface - a unique number that identifies the rpc interface
6503 * IN currentFunc - the index of the function being invoked
6505 * IN totalFunc - the total number of functions in this interface
6507 * IN queueTime - the amount of time this function waited for a thread
6509 * IN execTime - the amount of time this function invocation took to execute
6511 * IN bytesSent - the number bytes sent by this invocation
6513 * IN bytesRcvd - the number bytes received by this invocation
6515 * IN isServer - if true, this invocation was made to a server
6517 * IN remoteHost - the ip address of the remote host
6519 * IN remotePort - the port of the remote host
6521 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6523 * INOUT counter - if a new stats structure is allocated, the counter will
6524 * be updated with the new number of allocated stat structures
6531 static int rxi_AddRpcStat(
6532 struct rx_queue *stats,
6533 afs_uint32 rxInterface,
6534 afs_uint32 currentFunc,
6535 afs_uint32 totalFunc,
6536 struct clock *queueTime,
6537 struct clock *execTime,
6538 afs_hyper_t *bytesSent,
6539 afs_hyper_t *bytesRcvd,
6541 afs_uint32 remoteHost,
6542 afs_uint32 remotePort,
6544 unsigned int *counter)
6547 rx_interface_stat_p rpc_stat, nrpc_stat;
6550 * See if there's already a structure for this interface
6553 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6554 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6555 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6559 * Didn't find a match so allocate a new structure and add it to the
6563 if ((rpc_stat == NULL) ||
6564 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6565 (rpc_stat->stats[0].remote_is_server != isServer)) {
6569 space = sizeof(rx_interface_stat_t) + totalFunc *
6570 sizeof(rx_function_entry_v1_t);
6572 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6573 if (rpc_stat == NULL) {
6577 *counter += totalFunc;
6578 for(i=0;i<totalFunc;i++) {
6579 rpc_stat->stats[i].remote_peer = remoteHost;
6580 rpc_stat->stats[i].remote_port = remotePort;
6581 rpc_stat->stats[i].remote_is_server = isServer;
6582 rpc_stat->stats[i].interfaceId = rxInterface;
6583 rpc_stat->stats[i].func_total = totalFunc;
6584 rpc_stat->stats[i].func_index = i;
6585 hzero(rpc_stat->stats[i].invocations);
6586 hzero(rpc_stat->stats[i].bytes_sent);
6587 hzero(rpc_stat->stats[i].bytes_rcvd);
6588 rpc_stat->stats[i].queue_time_sum.sec = 0;
6589 rpc_stat->stats[i].queue_time_sum.usec = 0;
6590 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6591 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6592 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6593 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6594 rpc_stat->stats[i].queue_time_max.sec = 0;
6595 rpc_stat->stats[i].queue_time_max.usec = 0;
6596 rpc_stat->stats[i].execution_time_sum.sec = 0;
6597 rpc_stat->stats[i].execution_time_sum.usec = 0;
6598 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6599 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6600 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6601 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6602 rpc_stat->stats[i].execution_time_max.sec = 0;
6603 rpc_stat->stats[i].execution_time_max.usec = 0;
6605 queue_Prepend(stats, rpc_stat);
6606 if (addToPeerList) {
6607 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6612 * Increment the stats for this function
6615 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6616 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6617 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6618 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6619 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6620 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6621 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6623 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6624 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6626 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6627 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6628 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6629 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6631 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6632 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6640 * rx_IncrementTimeAndCount - increment the times and count for a particular
6645 * IN peer - the peer who invoked the rpc
6647 * IN rxInterface - a unique number that identifies the rpc interface
6649 * IN currentFunc - the index of the function being invoked
6651 * IN totalFunc - the total number of functions in this interface
6653 * IN queueTime - the amount of time this function waited for a thread
6655 * IN execTime - the amount of time this function invocation took to execute
6657 * IN bytesSent - the number bytes sent by this invocation
6659 * IN bytesRcvd - the number bytes received by this invocation
6661 * IN isServer - if true, this invocation was made to a server
6668 void rx_IncrementTimeAndCount(
6669 struct rx_peer *peer,
6670 afs_uint32 rxInterface,
6671 afs_uint32 currentFunc,
6672 afs_uint32 totalFunc,
6673 struct clock *queueTime,
6674 struct clock *execTime,
6675 afs_hyper_t *bytesSent,
6676 afs_hyper_t *bytesRcvd,
6680 MUTEX_ENTER(&rx_rpc_stats);
6681 MUTEX_ENTER(&peer->peer_lock);
6683 if (rxi_monitor_peerStats) {
6684 rxi_AddRpcStat(&peer->rpcStats,
6696 &rxi_rpc_peer_stat_cnt);
6699 if (rxi_monitor_processStats) {
6700 rxi_AddRpcStat(&processStats,
6712 &rxi_rpc_process_stat_cnt);
6715 MUTEX_EXIT(&peer->peer_lock);
6716 MUTEX_EXIT(&rx_rpc_stats);
6721 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6725 * IN callerVersion - the rpc stat version of the caller.
6727 * IN count - the number of entries to marshall.
6729 * IN stats - pointer to stats to be marshalled.
6731 * OUT ptr - Where to store the marshalled data.
6737 void rx_MarshallProcessRPCStats(
6738 afs_uint32 callerVersion,
6740 rx_function_entry_v1_t *stats,
6747 * We only support the first version
6749 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6750 *(ptr++) = stats->remote_peer;
6751 *(ptr++) = stats->remote_port;
6752 *(ptr++) = stats->remote_is_server;
6753 *(ptr++) = stats->interfaceId;
6754 *(ptr++) = stats->func_total;
6755 *(ptr++) = stats->func_index;
6756 *(ptr++) = hgethi(stats->invocations);
6757 *(ptr++) = hgetlo(stats->invocations);
6758 *(ptr++) = hgethi(stats->bytes_sent);
6759 *(ptr++) = hgetlo(stats->bytes_sent);
6760 *(ptr++) = hgethi(stats->bytes_rcvd);
6761 *(ptr++) = hgetlo(stats->bytes_rcvd);
6762 *(ptr++) = stats->queue_time_sum.sec;
6763 *(ptr++) = stats->queue_time_sum.usec;
6764 *(ptr++) = stats->queue_time_sum_sqr.sec;
6765 *(ptr++) = stats->queue_time_sum_sqr.usec;
6766 *(ptr++) = stats->queue_time_min.sec;
6767 *(ptr++) = stats->queue_time_min.usec;
6768 *(ptr++) = stats->queue_time_max.sec;
6769 *(ptr++) = stats->queue_time_max.usec;
6770 *(ptr++) = stats->execution_time_sum.sec;
6771 *(ptr++) = stats->execution_time_sum.usec;
6772 *(ptr++) = stats->execution_time_sum_sqr.sec;
6773 *(ptr++) = stats->execution_time_sum_sqr.usec;
6774 *(ptr++) = stats->execution_time_min.sec;
6775 *(ptr++) = stats->execution_time_min.usec;
6776 *(ptr++) = stats->execution_time_max.sec;
6777 *(ptr++) = stats->execution_time_max.usec;
6783 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6788 * IN callerVersion - the rpc stat version of the caller
6790 * OUT myVersion - the rpc stat version of this function
6792 * OUT clock_sec - local time seconds
6794 * OUT clock_usec - local time microseconds
6796 * OUT allocSize - the number of bytes allocated to contain stats
6798 * OUT statCount - the number stats retrieved from this process.
6800 * OUT stats - the actual stats retrieved from this process.
6804 * Returns void. If successful, stats will != NULL.
6807 int rx_RetrieveProcessRPCStats(
6808 afs_uint32 callerVersion,
6809 afs_uint32 *myVersion,
6810 afs_uint32 *clock_sec,
6811 afs_uint32 *clock_usec,
6813 afs_uint32 *statCount,
6824 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6827 * Check to see if stats are enabled
6830 MUTEX_ENTER(&rx_rpc_stats);
6831 if (!rxi_monitor_processStats) {
6832 MUTEX_EXIT(&rx_rpc_stats);
6836 clock_GetTime(&now);
6837 *clock_sec = now.sec;
6838 *clock_usec = now.usec;
6841 * Allocate the space based upon the caller version
6843 * If the client is at an older version than we are,
6844 * we return the statistic data in the older data format, but
6845 * we still return our version number so the client knows we
6846 * are maintaining more data than it can retrieve.
6849 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6850 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6851 *statCount = rxi_rpc_process_stat_cnt;
6854 * This can't happen yet, but in the future version changes
6855 * can be handled by adding additional code here
6859 if (space > (size_t) 0) {
6861 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6864 register struct rx_peer *pp;
6867 rx_interface_stat_p rpc_stat, nrpc_stat;
6870 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6871 rx_interface_stat)) {
6873 * Copy the data based upon the caller version
6875 rx_MarshallProcessRPCStats(callerVersion,
6876 rpc_stat->stats[0].func_total,
6877 rpc_stat->stats, &ptr);
6883 MUTEX_EXIT(&rx_rpc_stats);
6888 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6892 * IN callerVersion - the rpc stat version of the caller
6894 * OUT myVersion - the rpc stat version of this function
6896 * OUT clock_sec - local time seconds
6898 * OUT clock_usec - local time microseconds
6900 * OUT allocSize - the number of bytes allocated to contain stats
6902 * OUT statCount - the number of stats retrieved from the individual
6905 * OUT stats - the actual stats retrieved from the individual peer structures.
6909 * Returns void. If successful, stats will != NULL.
6912 int rx_RetrievePeerRPCStats(
6913 afs_uint32 callerVersion,
6914 afs_uint32 *myVersion,
6915 afs_uint32 *clock_sec,
6916 afs_uint32 *clock_usec,
6918 afs_uint32 *statCount,
6929 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6932 * Check to see if stats are enabled
6935 MUTEX_ENTER(&rx_rpc_stats);
6936 if (!rxi_monitor_peerStats) {
6937 MUTEX_EXIT(&rx_rpc_stats);
6941 clock_GetTime(&now);
6942 *clock_sec = now.sec;
6943 *clock_usec = now.usec;
6946 * Allocate the space based upon the caller version
6948 * If the client is at an older version than we are,
6949 * we return the statistic data in the older data format, but
6950 * we still return our version number so the client knows we
6951 * are maintaining more data than it can retrieve.
6954 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6955 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6956 *statCount = rxi_rpc_peer_stat_cnt;
6959 * This can't happen yet, but in the future version changes
6960 * can be handled by adding additional code here
6964 if (space > (size_t) 0) {
6966 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6971 rx_interface_stat_p rpc_stat, nrpc_stat;
6974 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
6975 rx_interface_stat)) {
6977 * We have to fix the offset of rpc_stat since we are
6978 * keeping this structure on two rx_queues. The rx_queue
6979 * package assumes that the rx_queue member is the first
6980 * member of the structure. That is, rx_queue assumes that
6981 * any one item is only on one queue at a time. We are
6982 * breaking that assumption and so we have to do a little
6983 * math to fix our pointers.
6986 fix_offset = (char *) rpc_stat;
6987 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
6988 rpc_stat = (rx_interface_stat_p) fix_offset;
6991 * Copy the data based upon the caller version
6993 rx_MarshallProcessRPCStats(callerVersion,
6994 rpc_stat->stats[0].func_total,
6995 rpc_stat->stats, &ptr);
7001 MUTEX_EXIT(&rx_rpc_stats);
7006 * rx_FreeRPCStats - free memory allocated by
7007 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7011 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7012 * rx_RetrievePeerRPCStats
7014 * IN allocSize - the number of bytes in stats.
7021 void rx_FreeRPCStats(
7025 rxi_Free(stats, allocSize);
7029 * rx_queryProcessRPCStats - see if process rpc stat collection is
7030 * currently enabled.
7036 * Returns 0 if stats are not enabled != 0 otherwise
7039 int rx_queryProcessRPCStats()
7042 MUTEX_ENTER(&rx_rpc_stats);
7043 rc = rxi_monitor_processStats;
7044 MUTEX_EXIT(&rx_rpc_stats);
7049 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7055 * Returns 0 if stats are not enabled != 0 otherwise
7058 int rx_queryPeerRPCStats()
7061 MUTEX_ENTER(&rx_rpc_stats);
7062 rc = rxi_monitor_peerStats;
7063 MUTEX_EXIT(&rx_rpc_stats);
7068 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7077 void rx_enableProcessRPCStats()
7079 MUTEX_ENTER(&rx_rpc_stats);
7080 rx_enable_stats = 1;
7081 rxi_monitor_processStats = 1;
7082 MUTEX_EXIT(&rx_rpc_stats);
7086 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7095 void rx_enablePeerRPCStats()
7097 MUTEX_ENTER(&rx_rpc_stats);
7098 rx_enable_stats = 1;
7099 rxi_monitor_peerStats = 1;
7100 MUTEX_EXIT(&rx_rpc_stats);
7104 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7113 void rx_disableProcessRPCStats()
7115 rx_interface_stat_p rpc_stat, nrpc_stat;
7118 MUTEX_ENTER(&rx_rpc_stats);
7121 * Turn off process statistics and if peer stats is also off, turn
7125 rxi_monitor_processStats = 0;
7126 if (rxi_monitor_peerStats == 0) {
7127 rx_enable_stats = 0;
7130 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7131 unsigned int num_funcs = 0;
7132 if (!rpc_stat) break;
7133 queue_Remove(rpc_stat);
7134 num_funcs = rpc_stat->stats[0].func_total;
7135 space = sizeof(rx_interface_stat_t) +
7136 rpc_stat->stats[0].func_total *
7137 sizeof(rx_function_entry_v1_t);
7139 rxi_Free(rpc_stat, space);
7140 rxi_rpc_process_stat_cnt -= num_funcs;
7142 MUTEX_EXIT(&rx_rpc_stats);
7146 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7155 void rx_disablePeerRPCStats()
7157 struct rx_peer **peer_ptr, **peer_end;
7160 MUTEX_ENTER(&rx_rpc_stats);
7163 * Turn off peer statistics and if process stats is also off, turn
7167 rxi_monitor_peerStats = 0;
7168 if (rxi_monitor_processStats == 0) {
7169 rx_enable_stats = 0;
7172 MUTEX_ENTER(&rx_peerHashTable_lock);
7173 for (peer_ptr = &rx_peerHashTable[0],
7174 peer_end = &rx_peerHashTable[rx_hashTableSize];
7175 peer_ptr < peer_end; peer_ptr++) {
7176 struct rx_peer *peer, *next, *prev;
7177 for (prev = peer = *peer_ptr; peer; peer = next) {
7179 code = MUTEX_TRYENTER(&peer->peer_lock);
7181 rx_interface_stat_p rpc_stat, nrpc_stat;
7183 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7184 rx_interface_stat)) {
7185 unsigned int num_funcs = 0;
7186 if (!rpc_stat) break;
7187 queue_Remove(&rpc_stat->queue_header);
7188 queue_Remove(&rpc_stat->all_peers);
7189 num_funcs = rpc_stat->stats[0].func_total;
7190 space = sizeof(rx_interface_stat_t) +
7191 rpc_stat->stats[0].func_total *
7192 sizeof(rx_function_entry_v1_t);
7194 rxi_Free(rpc_stat, space);
7195 rxi_rpc_peer_stat_cnt -= num_funcs;
7197 MUTEX_EXIT(&peer->peer_lock);
7198 if (prev == *peer_ptr) {
7210 MUTEX_EXIT(&rx_peerHashTable_lock);
7211 MUTEX_EXIT(&rx_rpc_stats);
7215 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7220 * IN clearFlag - flag indicating which stats to clear
7227 void rx_clearProcessRPCStats(
7228 afs_uint32 clearFlag)
7230 rx_interface_stat_p rpc_stat, nrpc_stat;
7232 MUTEX_ENTER(&rx_rpc_stats);
7234 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7235 unsigned int num_funcs = 0, i;
7236 num_funcs = rpc_stat->stats[0].func_total;
7237 for(i=0;i<num_funcs;i++) {
7238 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7239 hzero(rpc_stat->stats[i].invocations);
7241 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7242 hzero(rpc_stat->stats[i].bytes_sent);
7244 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7245 hzero(rpc_stat->stats[i].bytes_rcvd);
7247 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7248 rpc_stat->stats[i].queue_time_sum.sec = 0;
7249 rpc_stat->stats[i].queue_time_sum.usec = 0;
7251 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7252 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7253 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7255 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7256 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7257 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7259 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7260 rpc_stat->stats[i].queue_time_max.sec = 0;
7261 rpc_stat->stats[i].queue_time_max.usec = 0;
7263 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7264 rpc_stat->stats[i].execution_time_sum.sec = 0;
7265 rpc_stat->stats[i].execution_time_sum.usec = 0;
7267 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7268 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7269 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7271 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7272 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7273 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7275 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7276 rpc_stat->stats[i].execution_time_max.sec = 0;
7277 rpc_stat->stats[i].execution_time_max.usec = 0;
7282 MUTEX_EXIT(&rx_rpc_stats);
7286 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7291 * IN clearFlag - flag indicating which stats to clear
7298 void rx_clearPeerRPCStats(
7299 afs_uint32 clearFlag)
7301 rx_interface_stat_p rpc_stat, nrpc_stat;
7303 MUTEX_ENTER(&rx_rpc_stats);
7305 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7306 unsigned int num_funcs = 0, i;
7309 * We have to fix the offset of rpc_stat since we are
7310 * keeping this structure on two rx_queues. The rx_queue
7311 * package assumes that the rx_queue member is the first
7312 * member of the structure. That is, rx_queue assumes that
7313 * any one item is only on one queue at a time. We are
7314 * breaking that assumption and so we have to do a little
7315 * math to fix our pointers.
7318 fix_offset = (char *) rpc_stat;
7319 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7320 rpc_stat = (rx_interface_stat_p) fix_offset;
7322 num_funcs = rpc_stat->stats[0].func_total;
7323 for(i=0;i<num_funcs;i++) {
7324 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7325 hzero(rpc_stat->stats[i].invocations);
7327 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7328 hzero(rpc_stat->stats[i].bytes_sent);
7330 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7331 hzero(rpc_stat->stats[i].bytes_rcvd);
7333 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7334 rpc_stat->stats[i].queue_time_sum.sec = 0;
7335 rpc_stat->stats[i].queue_time_sum.usec = 0;
7337 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7338 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7339 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7341 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7342 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7343 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7345 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7346 rpc_stat->stats[i].queue_time_max.sec = 0;
7347 rpc_stat->stats[i].queue_time_max.usec = 0;
7349 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7350 rpc_stat->stats[i].execution_time_sum.sec = 0;
7351 rpc_stat->stats[i].execution_time_sum.usec = 0;
7353 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7354 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7355 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7357 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7358 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7359 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7361 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7362 rpc_stat->stats[i].execution_time_max.sec = 0;
7363 rpc_stat->stats[i].execution_time_max.usec = 0;
7368 MUTEX_EXIT(&rx_rpc_stats);
7372 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7373 * is authorized to enable/disable/clear RX statistics.
7375 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7377 void rx_SetRxStatUserOk(
7378 int (*proc)(struct rx_call *call))
7380 rxi_rxstat_userok = proc;
7383 int rx_RxStatUserOk(
7384 struct rx_call *call)
7386 if (!rxi_rxstat_userok)
7388 return rxi_rxstat_userok(call);