2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
21 #include <net/net_globals.h>
22 #endif /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
33 #undef RXDEBUG /* turn off debugging */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
38 #include "../afsint/afsint.h"
45 #endif /* AFS_ALPHA_ENV */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
60 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
69 # include <afs/param.h>
70 # include <sys/types.h>
77 # include <sys/socket.h>
78 # include <sys/file.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
94 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
96 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
97 afs_int32 rxi_start_in_error;
99 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
102 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
103 * currently allocated within rx. This number is used to allocate the
104 * memory required to return the statistics when queried.
107 static unsigned int rxi_rpc_peer_stat_cnt;
110 * rxi_rpc_process_stat_cnt counts the total number of local process stat
111 * structures currently allocated within rx. The number is used to allocate
112 * the memory required to return the statistics when queried.
115 static unsigned int rxi_rpc_process_stat_cnt;
117 #if !defined(offsetof)
118 #include <stddef.h> /* for definition of offsetof() */
121 #ifdef AFS_PTHREAD_ENV
125 * Use procedural initialization of mutexes/condition variables
129 extern pthread_mutex_t rxkad_stats_mutex;
130 extern pthread_mutex_t des_init_mutex;
131 extern pthread_mutex_t des_random_mutex;
132 extern pthread_mutex_t rx_clock_mutex;
133 extern pthread_mutex_t rxi_connCacheMutex;
134 extern pthread_mutex_t rx_event_mutex;
135 extern pthread_mutex_t osi_malloc_mutex;
136 extern pthread_mutex_t event_handler_mutex;
137 extern pthread_mutex_t listener_mutex;
138 extern pthread_mutex_t rx_if_init_mutex;
139 extern pthread_mutex_t rx_if_mutex;
140 extern pthread_mutex_t rxkad_client_uid_mutex;
141 extern pthread_mutex_t rxkad_random_mutex;
143 extern pthread_cond_t rx_event_handler_cond;
144 extern pthread_cond_t rx_listener_cond;
146 static pthread_mutex_t epoch_mutex;
147 static pthread_mutex_t rx_init_mutex;
148 static pthread_mutex_t rx_debug_mutex;
150 static void rxi_InitPthread(void) {
151 assert(pthread_mutex_init(&rx_clock_mutex,
152 (const pthread_mutexattr_t*)0)==0);
153 assert(pthread_mutex_init(&rxi_connCacheMutex,
154 (const pthread_mutexattr_t*)0)==0);
155 assert(pthread_mutex_init(&rx_init_mutex,
156 (const pthread_mutexattr_t*)0)==0);
157 assert(pthread_mutex_init(&epoch_mutex,
158 (const pthread_mutexattr_t*)0)==0);
159 assert(pthread_mutex_init(&rx_event_mutex,
160 (const pthread_mutexattr_t*)0)==0);
161 assert(pthread_mutex_init(&des_init_mutex,
162 (const pthread_mutexattr_t*)0)==0);
163 assert(pthread_mutex_init(&des_random_mutex,
164 (const pthread_mutexattr_t*)0)==0);
165 assert(pthread_mutex_init(&osi_malloc_mutex,
166 (const pthread_mutexattr_t*)0)==0);
167 assert(pthread_mutex_init(&event_handler_mutex,
168 (const pthread_mutexattr_t*)0)==0);
169 assert(pthread_mutex_init(&listener_mutex,
170 (const pthread_mutexattr_t*)0)==0);
171 assert(pthread_mutex_init(&rx_if_init_mutex,
172 (const pthread_mutexattr_t*)0)==0);
173 assert(pthread_mutex_init(&rx_if_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rxkad_random_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&rxkad_stats_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_debug_mutex,
182 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_cond_init(&rx_event_handler_cond,
185 (const pthread_condattr_t*)0)==0);
186 assert(pthread_cond_init(&rx_listener_cond,
187 (const pthread_condattr_t*)0)==0);
188 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
191 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
192 #define INIT_PTHREAD_LOCKS \
193 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
195 * The rx_stats_mutex mutex protects the following global variables:
200 * rxi_lowConnRefCount
201 * rxi_lowPeerRefCount
210 #define INIT_PTHREAD_LOCKS
213 extern void rxi_DeleteCachedConnections(void);
216 /* Variables for handling the minProcs implementation. availProcs gives the
217 * number of threads available in the pool at this moment (not counting dudes
218 * executing right now). totalMin gives the total number of procs required
219 * for handling all minProcs requests. minDeficit is a dynamic variable
220 * tracking the # of procs required to satisfy all of the remaining minProcs
222 * For fine grain locking to work, the quota check and the reservation of
223 * a server thread has to come while rxi_availProcs and rxi_minDeficit
224 * are locked. To this end, the code has been modified under #ifdef
225 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
226 * same time. A new function, ReturnToServerPool() returns the allocation.
228 * A call can be on several queue's (but only one at a time). When
229 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
230 * that no one else is touching the queue. To this end, we store the address
231 * of the queue lock in the call structure (under the call lock) when we
232 * put the call on a queue, and we clear the call_queue_lock when the
233 * call is removed from a queue (once the call lock has been obtained).
234 * This allows rxi_ResetCall to safely synchronize with others wishing
235 * to manipulate the queue.
238 extern void rxi_Delay(int);
240 static int rxi_ServerThreadSelectingCall;
242 #ifdef RX_ENABLE_LOCKS
243 static afs_kmutex_t rx_rpc_stats;
244 void rxi_StartUnlocked();
247 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
248 ** pretty good that the next packet coming in is from the same connection
249 ** as the last packet, since we're send multiple packets in a transmit window.
251 struct rx_connection *rxLastConn;
253 #ifdef RX_ENABLE_LOCKS
254 /* The locking hierarchy for rx fine grain locking is composed of five
256 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
257 * call->lock - locks call data fields.
258 * Most any other lock - these are all independent of each other.....
260 * rx_freeCallQueue_lock
262 * rx_connHashTable_lock
265 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
268 * peer_lock - locks peer data fields.
269 * conn_data_lock - that more than one thread is not updating a conn data
270 * field at the same time.
271 * Do we need a lock to protect the peer field in the conn structure?
272 * conn->peer was previously a constant for all intents and so has no
273 * lock protecting this field. The multihomed client delta introduced
274 * a RX code change : change the peer field in the connection structure
275 * to that remote inetrface from which the last packet for this
276 * connection was sent out. This may become an issue if further changes
279 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
280 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
282 /* rxdb_fileID is used to identify the lock location, along with line#. */
283 static int rxdb_fileID = RXDB_FILE_RX;
284 #endif /* RX_LOCKS_DB */
285 static void rxi_SetAcksInTransmitQueue();
286 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
287 #else /* RX_ENABLE_LOCKS */
288 #define SET_CALL_QUEUE_LOCK(C, L)
289 #define CLEAR_CALL_QUEUE_LOCK(C)
290 #endif /* RX_ENABLE_LOCKS */
291 static void rxi_DestroyConnectionNoLock();
292 void rxi_DestroyConnection();
293 void rxi_CleanupConnection();
294 struct rx_serverQueueEntry *rx_waitForPacket = 0;
296 /* ------------Exported Interfaces------------- */
298 /* This function allows rxkad to set the epoch to a suitably random number
299 * which rx_NewConnection will use in the future. The principle purpose is to
300 * get rxnull connections to use the same epoch as the rxkad connections do, at
301 * least once the first rxkad connection is established. This is important now
302 * that the host/port addresses aren't used in FindConnection: the uniqueness
303 * of epoch/cid matters and the start time won't do. */
305 #ifdef AFS_PTHREAD_ENV
307 * This mutex protects the following global variables:
311 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
312 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
316 #endif /* AFS_PTHREAD_ENV */
318 void rx_SetEpoch (epoch)
326 /* Initialize rx. A port number may be mentioned, in which case this
327 * becomes the default port number for any service installed later.
328 * If 0 is provided for the port number, a random port will be chosen
329 * by the kernel. Whether this will ever overlap anything in
330 * /etc/services is anybody's guess... Returns 0 on success, -1 on
332 static int rxinit_status = 1;
333 #ifdef AFS_PTHREAD_ENV
335 * This mutex protects the following global variables:
339 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
340 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
343 #define UNLOCK_RX_INIT
346 int rx_Init(u_int port)
353 char *htable, *ptable;
360 if (rxinit_status == 0) {
361 tmp_status = rxinit_status;
363 return tmp_status; /* Already started; return previous error code. */
367 if (afs_winsockInit()<0)
373 * Initialize anything necessary to provide a non-premptive threading
376 rxi_InitializeThreadSupport();
379 /* Allocate and initialize a socket for client and perhaps server
382 rx_socket = rxi_GetUDPSocket((u_short)port);
383 if (rx_socket == OSI_NULLSOCKET) {
389 #ifdef RX_ENABLE_LOCKS
392 #endif /* RX_LOCKS_DB */
393 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
394 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
395 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
396 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
397 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
399 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
400 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
401 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
402 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
404 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
406 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
407 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
409 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
410 #endif /* KERNEL && AFS_HPUX110_ENV */
411 #else /* RX_ENABLE_LOCKS */
412 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
413 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
414 #endif /* AFS_GLOBAL_SUNLOCK */
415 #endif /* RX_ENABLE_LOCKS */
418 rx_connDeadTime = 12;
419 rx_tranquil = 0; /* reset flag */
420 bzero((char *)&rx_stats, sizeof(struct rx_stats));
422 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
423 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
424 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
425 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
426 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
427 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
429 /* Malloc up a bunch of packets & buffers */
431 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
432 queue_Init(&rx_freePacketQueue);
433 rxi_NeedMorePackets = FALSE;
434 rxi_MorePackets(rx_nPackets);
442 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
443 tv.tv_sec = clock_now.sec;
444 tv.tv_usec = clock_now.usec;
445 srand((unsigned int) tv.tv_usec);
449 /* *Slightly* random start time for the cid. This is just to help
450 * out with the hashing function at the peer */
452 rx_stats.minRtt.sec = 9999999;
454 rx_SetEpoch (tv.tv_sec | 0x80000000);
456 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
457 * will provide a randomer value. */
459 MUTEX_ENTER(&rx_stats_mutex);
460 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
461 MUTEX_EXIT(&rx_stats_mutex);
462 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
463 rx_connHashTable = (struct rx_connection **) htable;
464 rx_peerHashTable = (struct rx_peer **) ptable;
466 rx_lastAckDelay.sec = 0;
467 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
468 rx_hardAckDelay.sec = 0;
469 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
470 rx_softAckDelay.sec = 0;
471 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
473 rxevent_Init(20, rxi_ReScheduleEvents);
475 /* Initialize various global queues */
476 queue_Init(&rx_idleServerQueue);
477 queue_Init(&rx_incomingCallQueue);
478 queue_Init(&rx_freeCallQueue);
480 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
481 /* Initialize our list of usable IP addresses. */
485 /* Start listener process (exact function is dependent on the
486 * implementation environment--kernel or user space) */
491 tmp_status = rxinit_status = 0;
496 /* called with unincremented nRequestsRunning to see if it is OK to start
497 * a new thread in this service. Could be "no" for two reasons: over the
498 * max quota, or would prevent others from reaching their min quota.
500 #ifdef RX_ENABLE_LOCKS
501 /* This verion of QuotaOK reserves quota if it's ok while the
502 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
504 static int QuotaOK(aservice)
505 register struct rx_service *aservice;
507 /* check if over max quota */
508 if (aservice->nRequestsRunning >= aservice->maxProcs) {
512 /* under min quota, we're OK */
513 /* otherwise, can use only if there are enough to allow everyone
514 * to go to their min quota after this guy starts.
516 MUTEX_ENTER(&rx_stats_mutex);
517 if ((aservice->nRequestsRunning < aservice->minProcs) ||
518 (rxi_availProcs > rxi_minDeficit)) {
519 aservice->nRequestsRunning++;
520 /* just started call in minProcs pool, need fewer to maintain
522 if (aservice->nRequestsRunning <= aservice->minProcs)
525 MUTEX_EXIT(&rx_stats_mutex);
528 MUTEX_EXIT(&rx_stats_mutex);
532 static void ReturnToServerPool(aservice)
533 register struct rx_service *aservice;
535 aservice->nRequestsRunning--;
536 MUTEX_ENTER(&rx_stats_mutex);
537 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
539 MUTEX_EXIT(&rx_stats_mutex);
542 #else /* RX_ENABLE_LOCKS */
543 static QuotaOK(aservice)
544 register struct rx_service *aservice; {
546 /* under min quota, we're OK */
547 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
549 /* check if over max quota */
550 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
552 /* otherwise, can use only if there are enough to allow everyone
553 * to go to their min quota after this guy starts.
555 if (rxi_availProcs > rxi_minDeficit) rc = 1;
558 #endif /* RX_ENABLE_LOCKS */
561 /* Called by rx_StartServer to start up lwp's to service calls.
562 NExistingProcs gives the number of procs already existing, and which
563 therefore needn't be created. */
564 void rxi_StartServerProcs(nExistingProcs)
567 register struct rx_service *service;
572 /* For each service, reserve N processes, where N is the "minimum"
573 number of processes that MUST be able to execute a request in parallel,
574 at any time, for that process. Also compute the maximum difference
575 between any service's maximum number of processes that can run
576 (i.e. the maximum number that ever will be run, and a guarantee
577 that this number will run if other services aren't running), and its
578 minimum number. The result is the extra number of processes that
579 we need in order to provide the latter guarantee */
580 for (i=0; i<RX_MAX_SERVICES; i++) {
582 service = rx_services[i];
583 if (service == (struct rx_service *) 0) break;
584 nProcs += service->minProcs;
585 diff = service->maxProcs - service->minProcs;
586 if (diff > maxdiff) maxdiff = diff;
588 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
589 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
590 for (i = 0; i<nProcs; i++) {
591 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
596 /* This routine must be called if any services are exported. If the
597 * donateMe flag is set, the calling process is donated to the server
599 void rx_StartServer(donateMe)
601 register struct rx_service *service;
608 /* Start server processes, if necessary (exact function is dependent
609 * on the implementation environment--kernel or user space). DonateMe
610 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
611 * case, one less new proc will be created rx_StartServerProcs.
613 rxi_StartServerProcs(donateMe);
615 /* count up the # of threads in minProcs, and add set the min deficit to
616 * be that value, too.
618 for (i=0; i<RX_MAX_SERVICES; i++) {
619 service = rx_services[i];
620 if (service == (struct rx_service *) 0) break;
621 MUTEX_ENTER(&rx_stats_mutex);
622 rxi_totalMin += service->minProcs;
623 /* below works even if a thread is running, since minDeficit would
624 * still have been decremented and later re-incremented.
626 rxi_minDeficit += service->minProcs;
627 MUTEX_EXIT(&rx_stats_mutex);
630 /* Turn on reaping of idle server connections */
631 rxi_ReapConnections();
636 if (donateMe) rx_ServerProc(); /* Never returns */
640 /* Create a new client connection to the specified service, using the
641 * specified security object to implement the security model for this
643 struct rx_connection *
644 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
645 register afs_uint32 shost; /* Server host */
646 u_short sport; /* Server port */
647 u_short sservice; /* Server service id */
648 register struct rx_securityClass *securityObject;
649 int serviceSecurityIndex;
653 register struct rx_connection *conn;
658 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
659 shost, sport, sservice, securityObject, serviceSecurityIndex));
661 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
662 * the case of kmem_alloc? */
663 conn = rxi_AllocConnection();
664 #ifdef RX_ENABLE_LOCKS
665 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
666 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
667 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
671 MUTEX_ENTER(&rx_connHashTable_lock);
672 cid = (rx_nextCid += RX_MAXCALLS);
673 conn->type = RX_CLIENT_CONNECTION;
675 conn->epoch = rx_epoch;
676 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
677 conn->serviceId = sservice;
678 conn->securityObject = securityObject;
679 /* This doesn't work in all compilers with void (they're buggy), so fake it
681 conn->securityData = (VOID *) 0;
682 conn->securityIndex = serviceSecurityIndex;
683 rx_SetConnDeadTime(conn, rx_connDeadTime);
684 conn->ackRate = RX_FAST_ACK_RATE;
686 conn->specific = NULL;
687 conn->challengeEvent = (struct rxevent *)0;
688 conn->delayedAbortEvent = (struct rxevent *)0;
689 conn->abortCount = 0;
692 RXS_NewConnection(securityObject, conn);
693 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
695 conn->refCount++; /* no lock required since only this thread knows... */
696 conn->next = rx_connHashTable[hashindex];
697 rx_connHashTable[hashindex] = conn;
698 MUTEX_ENTER(&rx_stats_mutex);
699 rx_stats.nClientConns++;
700 MUTEX_EXIT(&rx_stats_mutex);
702 MUTEX_EXIT(&rx_connHashTable_lock);
708 void rx_SetConnDeadTime(conn, seconds)
709 register struct rx_connection *conn;
710 register int seconds;
712 /* The idea is to set the dead time to a value that allows several
713 * keepalives to be dropped without timing out the connection. */
714 conn->secondsUntilDead = MAX(seconds, 6);
715 conn->secondsUntilPing = conn->secondsUntilDead/6;
718 int rxi_lowPeerRefCount = 0;
719 int rxi_lowConnRefCount = 0;
722 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
723 * NOTE: must not be called with rx_connHashTable_lock held.
725 void rxi_CleanupConnection(conn)
726 struct rx_connection *conn;
730 /* Notify the service exporter, if requested, that this connection
731 * is being destroyed */
732 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
733 (*conn->service->destroyConnProc)(conn);
735 /* Notify the security module that this connection is being destroyed */
736 RXS_DestroyConnection(conn->securityObject, conn);
738 /* If this is the last connection using the rx_peer struct, set its
739 * idle time to now. rxi_ReapConnections will reap it if it's still
740 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
742 MUTEX_ENTER(&rx_peerHashTable_lock);
743 if (--conn->peer->refCount <= 0) {
744 conn->peer->idleWhen = clock_Sec();
745 if (conn->peer->refCount < 0) {
746 conn->peer->refCount = 0;
747 MUTEX_ENTER(&rx_stats_mutex);
748 rxi_lowPeerRefCount ++;
749 MUTEX_EXIT(&rx_stats_mutex);
752 MUTEX_EXIT(&rx_peerHashTable_lock);
754 MUTEX_ENTER(&rx_stats_mutex);
755 if (conn->type == RX_SERVER_CONNECTION)
756 rx_stats.nServerConns--;
758 rx_stats.nClientConns--;
759 MUTEX_EXIT(&rx_stats_mutex);
762 if (conn->specific) {
763 for (i = 0 ; i < conn->nSpecific ; i++) {
764 if (conn->specific[i] && rxi_keyCreate_destructor[i])
765 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
766 conn->specific[i] = NULL;
768 free(conn->specific);
770 conn->specific = NULL;
774 MUTEX_DESTROY(&conn->conn_call_lock);
775 MUTEX_DESTROY(&conn->conn_data_lock);
776 CV_DESTROY(&conn->conn_call_cv);
778 rxi_FreeConnection(conn);
781 /* Destroy the specified connection */
782 void rxi_DestroyConnection(conn)
783 register struct rx_connection *conn;
785 MUTEX_ENTER(&rx_connHashTable_lock);
786 rxi_DestroyConnectionNoLock(conn);
787 /* conn should be at the head of the cleanup list */
788 if (conn == rx_connCleanup_list) {
789 rx_connCleanup_list = rx_connCleanup_list->next;
790 MUTEX_EXIT(&rx_connHashTable_lock);
791 rxi_CleanupConnection(conn);
793 #ifdef RX_ENABLE_LOCKS
795 MUTEX_EXIT(&rx_connHashTable_lock);
797 #endif /* RX_ENABLE_LOCKS */
800 static void rxi_DestroyConnectionNoLock(conn)
801 register struct rx_connection *conn;
803 register struct rx_connection **conn_ptr;
804 register int havecalls = 0;
805 struct rx_packet *packet;
812 MUTEX_ENTER(&conn->conn_data_lock);
813 if (conn->refCount > 0)
816 MUTEX_ENTER(&rx_stats_mutex);
817 rxi_lowConnRefCount++;
818 MUTEX_EXIT(&rx_stats_mutex);
821 if (conn->refCount > 0) {
822 /* Busy; wait till the last guy before proceeding */
823 MUTEX_EXIT(&conn->conn_data_lock);
828 /* If the client previously called rx_NewCall, but it is still
829 * waiting, treat this as a running call, and wait to destroy the
830 * connection later when the call completes. */
831 if ((conn->type == RX_CLIENT_CONNECTION) &&
832 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
833 conn->flags |= RX_CONN_DESTROY_ME;
834 MUTEX_EXIT(&conn->conn_data_lock);
838 MUTEX_EXIT(&conn->conn_data_lock);
840 /* Check for extant references to this connection */
841 for (i = 0; i<RX_MAXCALLS; i++) {
842 register struct rx_call *call = conn->call[i];
845 if (conn->type == RX_CLIENT_CONNECTION) {
846 MUTEX_ENTER(&call->lock);
847 if (call->delayedAckEvent) {
848 /* Push the final acknowledgment out now--there
849 * won't be a subsequent call to acknowledge the
850 * last reply packets */
851 rxevent_Cancel(call->delayedAckEvent, call,
852 RX_CALL_REFCOUNT_DELAY);
853 rxi_AckAll((struct rxevent *)0, call, 0);
855 MUTEX_EXIT(&call->lock);
859 #ifdef RX_ENABLE_LOCKS
861 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
862 MUTEX_EXIT(&conn->conn_data_lock);
865 /* Someone is accessing a packet right now. */
869 #endif /* RX_ENABLE_LOCKS */
872 /* Don't destroy the connection if there are any call
873 * structures still in use */
874 MUTEX_ENTER(&conn->conn_data_lock);
875 conn->flags |= RX_CONN_DESTROY_ME;
876 MUTEX_EXIT(&conn->conn_data_lock);
881 if (conn->delayedAbortEvent) {
882 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
883 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
885 MUTEX_ENTER(&conn->conn_data_lock);
886 rxi_SendConnectionAbort(conn, packet, 0, 1);
887 MUTEX_EXIT(&conn->conn_data_lock);
888 rxi_FreePacket(packet);
892 /* Remove from connection hash table before proceeding */
893 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
894 conn->epoch, conn->type) ];
895 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
896 if (*conn_ptr == conn) {
897 *conn_ptr = conn->next;
901 /* if the conn that we are destroying was the last connection, then we
902 * clear rxLastConn as well */
903 if ( rxLastConn == conn )
906 /* Make sure the connection is completely reset before deleting it. */
907 /* get rid of pending events that could zap us later */
908 if (conn->challengeEvent) {
909 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
912 /* Add the connection to the list of destroyed connections that
913 * need to be cleaned up. This is necessary to avoid deadlocks
914 * in the routines we call to inform others that this connection is
915 * being destroyed. */
916 conn->next = rx_connCleanup_list;
917 rx_connCleanup_list = conn;
920 /* Externally available version */
921 void rx_DestroyConnection(conn)
922 register struct rx_connection *conn;
928 rxi_DestroyConnection (conn);
933 /* Start a new rx remote procedure call, on the specified connection.
934 * If wait is set to 1, wait for a free call channel; otherwise return
935 * 0. Maxtime gives the maximum number of seconds this call may take,
936 * after rx_MakeCall returns. After this time interval, a call to any
937 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
938 * For fine grain locking, we hold the conn_call_lock in order to
939 * to ensure that we don't get signalle after we found a call in an active
940 * state and before we go to sleep.
942 struct rx_call *rx_NewCall(conn)
943 register struct rx_connection *conn;
946 register struct rx_call *call;
947 struct clock queueTime;
951 dpf (("rx_MakeCall(conn %x)\n", conn));
954 clock_GetTime(&queueTime);
956 MUTEX_ENTER(&conn->conn_call_lock);
958 for (i=0; i<RX_MAXCALLS; i++) {
959 call = conn->call[i];
961 MUTEX_ENTER(&call->lock);
962 if (call->state == RX_STATE_DALLY) {
963 rxi_ResetCall(call, 0);
964 (*call->callNumber)++;
967 MUTEX_EXIT(&call->lock);
970 call = rxi_NewCall(conn, i);
971 MUTEX_ENTER(&call->lock);
975 if (i < RX_MAXCALLS) {
978 MUTEX_ENTER(&conn->conn_data_lock);
979 conn->flags |= RX_CONN_MAKECALL_WAITING;
980 MUTEX_EXIT(&conn->conn_data_lock);
981 #ifdef RX_ENABLE_LOCKS
982 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
988 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
990 /* Client is initially in send mode */
991 call->state = RX_STATE_ACTIVE;
992 call->mode = RX_MODE_SENDING;
994 /* remember start time for call in case we have hard dead time limit */
995 call->queueTime = queueTime;
996 clock_GetTime(&call->startTime);
997 hzero(call->bytesSent);
998 hzero(call->bytesRcvd);
1000 /* Turn on busy protocol. */
1001 rxi_KeepAliveOn(call);
1003 MUTEX_EXIT(&call->lock);
1004 MUTEX_EXIT(&conn->conn_call_lock);
1008 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1009 /* Now, if TQ wasn't cleared earlier, do it now. */
1011 MUTEX_ENTER(&call->lock);
1012 while (call->flags & RX_CALL_TQ_BUSY) {
1013 call->flags |= RX_CALL_TQ_WAIT;
1014 #ifdef RX_ENABLE_LOCKS
1015 CV_WAIT(&call->cv_tq, &call->lock);
1016 #else /* RX_ENABLE_LOCKS */
1017 osi_rxSleep(&call->tq);
1018 #endif /* RX_ENABLE_LOCKS */
1020 if (call->flags & RX_CALL_TQ_CLEARME) {
1021 rxi_ClearTransmitQueue(call, 0);
1022 queue_Init(&call->tq);
1024 MUTEX_EXIT(&call->lock);
1026 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1031 rxi_HasActiveCalls(aconn)
1032 register struct rx_connection *aconn; {
1034 register struct rx_call *tcall;
1038 for(i=0; i<RX_MAXCALLS; i++) {
1039 if (tcall = aconn->call[i]) {
1040 if ((tcall->state == RX_STATE_ACTIVE)
1041 || (tcall->state == RX_STATE_PRECALL)) {
1051 rxi_GetCallNumberVector(aconn, aint32s)
1052 register struct rx_connection *aconn;
1053 register afs_int32 *aint32s; {
1055 register struct rx_call *tcall;
1059 for(i=0; i<RX_MAXCALLS; i++) {
1060 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1061 aint32s[i] = aconn->callNumber[i]+1;
1063 aint32s[i] = aconn->callNumber[i];
1069 rxi_SetCallNumberVector(aconn, aint32s)
1070 register struct rx_connection *aconn;
1071 register afs_int32 *aint32s; {
1073 register struct rx_call *tcall;
1077 for(i=0; i<RX_MAXCALLS; i++) {
1078 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1079 aconn->callNumber[i] = aint32s[i] - 1;
1081 aconn->callNumber[i] = aint32s[i];
1087 /* Advertise a new service. A service is named locally by a UDP port
1088 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1091 rx_NewService(port, serviceId, serviceName, securityObjects,
1092 nSecurityObjects, serviceProc)
1095 char *serviceName; /* Name for identification purposes (e.g. the
1096 * service name might be used for probing for
1098 struct rx_securityClass **securityObjects;
1099 int nSecurityObjects;
1100 afs_int32 (*serviceProc)();
1102 osi_socket socket = OSI_NULLSOCKET;
1103 register struct rx_service *tservice;
1109 if (serviceId == 0) {
1110 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1116 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1123 tservice = rxi_AllocService();
1126 for (i = 0; i<RX_MAX_SERVICES; i++) {
1127 register struct rx_service *service = rx_services[i];
1129 if (port == service->servicePort) {
1130 if (service->serviceId == serviceId) {
1131 /* The identical service has already been
1132 * installed; if the caller was intending to
1133 * change the security classes used by this
1134 * service, he/she loses. */
1135 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1138 rxi_FreeService(tservice);
1141 /* Different service, same port: re-use the socket
1142 * which is bound to the same port */
1143 socket = service->socket;
1146 if (socket == OSI_NULLSOCKET) {
1147 /* If we don't already have a socket (from another
1148 * service on same port) get a new one */
1149 socket = rxi_GetUDPSocket(port);
1150 if (socket == OSI_NULLSOCKET) {
1153 rxi_FreeService(tservice);
1158 service->socket = socket;
1159 service->servicePort = port;
1160 service->serviceId = serviceId;
1161 service->serviceName = serviceName;
1162 service->nSecurityObjects = nSecurityObjects;
1163 service->securityObjects = securityObjects;
1164 service->minProcs = 0;
1165 service->maxProcs = 1;
1166 service->idleDeadTime = 60;
1167 service->connDeadTime = rx_connDeadTime;
1168 service->executeRequestProc = serviceProc;
1169 rx_services[i] = service; /* not visible until now */
1177 rxi_FreeService(tservice);
1178 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1182 /* Generic request processing loop. This routine should be called
1183 * by the implementation dependent rx_ServerProc. If socketp is
1184 * non-null, it will be set to the file descriptor that this thread
1185 * is now listening on. If socketp is null, this routine will never
1187 void rxi_ServerProc(threadID, newcall, socketp)
1189 struct rx_call *newcall;
1190 osi_socket *socketp;
1192 register struct rx_call *call;
1193 register afs_int32 code;
1194 register struct rx_service *tservice = NULL;
1201 call = rx_GetCall(threadID, tservice, socketp);
1202 if (socketp && *socketp != OSI_NULLSOCKET) {
1203 /* We are now a listener thread */
1208 /* if server is restarting( typically smooth shutdown) then do not
1209 * allow any new calls.
1212 if ( rx_tranquil && (call != NULL) ) {
1217 MUTEX_ENTER(&call->lock);
1219 rxi_CallError(call, RX_RESTARTING);
1220 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1222 MUTEX_EXIT(&call->lock);
1228 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1229 #ifdef RX_ENABLE_LOCKS
1231 #endif /* RX_ENABLE_LOCKS */
1232 afs_termState = AFSOP_STOP_AFS;
1233 afs_osi_Wakeup(&afs_termState);
1234 #ifdef RX_ENABLE_LOCKS
1236 #endif /* RX_ENABLE_LOCKS */
1241 tservice = call->conn->service;
1243 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1245 code = call->conn->service->executeRequestProc(call);
1247 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1249 rx_EndCall(call, code);
1250 MUTEX_ENTER(&rx_stats_mutex);
1252 MUTEX_EXIT(&rx_stats_mutex);
1257 void rx_WakeupServerProcs()
1259 struct rx_serverQueueEntry *np, *tqp;
1264 MUTEX_ENTER(&rx_serverPool_lock);
1266 #ifdef RX_ENABLE_LOCKS
1267 if (rx_waitForPacket)
1268 CV_BROADCAST(&rx_waitForPacket->cv);
1269 #else /* RX_ENABLE_LOCKS */
1270 if (rx_waitForPacket)
1271 osi_rxWakeup(rx_waitForPacket);
1272 #endif /* RX_ENABLE_LOCKS */
1273 MUTEX_ENTER(&freeSQEList_lock);
1274 for (np = rx_FreeSQEList; np; np = tqp) {
1275 tqp = *(struct rx_serverQueueEntry **)np;
1276 #ifdef RX_ENABLE_LOCKS
1277 CV_BROADCAST(&np->cv);
1278 #else /* RX_ENABLE_LOCKS */
1280 #endif /* RX_ENABLE_LOCKS */
1282 MUTEX_EXIT(&freeSQEList_lock);
1283 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1284 #ifdef RX_ENABLE_LOCKS
1285 CV_BROADCAST(&np->cv);
1286 #else /* RX_ENABLE_LOCKS */
1288 #endif /* RX_ENABLE_LOCKS */
1290 MUTEX_EXIT(&rx_serverPool_lock);
1296 * One thing that seems to happen is that all the server threads get
1297 * tied up on some empty or slow call, and then a whole bunch of calls
1298 * arrive at once, using up the packet pool, so now there are more
1299 * empty calls. The most critical resources here are server threads
1300 * and the free packet pool. The "doreclaim" code seems to help in
1301 * general. I think that eventually we arrive in this state: there
1302 * are lots of pending calls which do have all their packets present,
1303 * so they won't be reclaimed, are multi-packet calls, so they won't
1304 * be scheduled until later, and thus are tying up most of the free
1305 * packet pool for a very long time.
1307 * 1. schedule multi-packet calls if all the packets are present.
1308 * Probably CPU-bound operation, useful to return packets to pool.
1309 * Do what if there is a full window, but the last packet isn't here?
1310 * 3. preserve one thread which *only* runs "best" calls, otherwise
1311 * it sleeps and waits for that type of call.
1312 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1313 * the current dataquota business is badly broken. The quota isn't adjusted
1314 * to reflect how many packets are presently queued for a running call.
1315 * So, when we schedule a queued call with a full window of packets queued
1316 * up for it, that *should* free up a window full of packets for other 2d-class
1317 * calls to be able to use from the packet pool. But it doesn't.
1319 * NB. Most of the time, this code doesn't run -- since idle server threads
1320 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1321 * as a new call arrives.
1323 /* Sleep until a call arrives. Returns a pointer to the call, ready
1324 * for an rx_Read. */
1325 #ifdef RX_ENABLE_LOCKS
1327 rx_GetCall(tno, cur_service, socketp)
1329 struct rx_service *cur_service;
1330 osi_socket *socketp;
1332 struct rx_serverQueueEntry *sq;
1333 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1334 struct rx_service *service;
1337 MUTEX_ENTER(&freeSQEList_lock);
1339 if (sq = rx_FreeSQEList) {
1340 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1341 MUTEX_EXIT(&freeSQEList_lock);
1342 } else { /* otherwise allocate a new one and return that */
1343 MUTEX_EXIT(&freeSQEList_lock);
1344 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1345 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1346 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1349 MUTEX_ENTER(&rx_serverPool_lock);
1350 if (cur_service != NULL) {
1351 ReturnToServerPool(cur_service);
1354 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1355 register struct rx_call *tcall, *ncall;
1356 choice2 = (struct rx_call *) 0;
1357 /* Scan for eligible incoming calls. A call is not eligible
1358 * if the maximum number of calls for its service type are
1359 * already executing */
1360 /* One thread will process calls FCFS (to prevent starvation),
1361 * while the other threads may run ahead looking for calls which
1362 * have all their input data available immediately. This helps
1363 * keep threads from blocking, waiting for data from the client. */
1364 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1365 service = tcall->conn->service;
1366 if (!QuotaOK(service)) {
1369 if (!tno || !tcall->queue_item_header.next ) {
1370 /* If we're thread 0, then we'll just use
1371 * this call. If we haven't been able to find an optimal
1372 * choice, and we're at the end of the list, then use a
1373 * 2d choice if one has been identified. Otherwise... */
1374 call = (choice2 ? choice2 : tcall);
1375 service = call->conn->service;
1376 } else if (!queue_IsEmpty(&tcall->rq)) {
1377 struct rx_packet *rp;
1378 rp = queue_First(&tcall->rq, rx_packet);
1379 if (rp->header.seq == 1) {
1380 if (!meltdown_1pkt ||
1381 (rp->header.flags & RX_LAST_PACKET)) {
1383 } else if (rxi_2dchoice && !choice2 &&
1384 !(tcall->flags & RX_CALL_CLEARED) &&
1385 (tcall->rprev > rxi_HardAckRate)) {
1387 } else rxi_md2cnt++;
1393 ReturnToServerPool(service);
1400 rxi_ServerThreadSelectingCall = 1;
1401 MUTEX_EXIT(&rx_serverPool_lock);
1402 MUTEX_ENTER(&call->lock);
1403 MUTEX_ENTER(&rx_serverPool_lock);
1405 if (queue_IsEmpty(&call->rq) ||
1406 queue_First(&call->rq, rx_packet)->header.seq != 1)
1407 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1409 CLEAR_CALL_QUEUE_LOCK(call);
1411 MUTEX_EXIT(&call->lock);
1412 ReturnToServerPool(service);
1413 rxi_ServerThreadSelectingCall = 0;
1414 CV_SIGNAL(&rx_serverPool_cv);
1415 call = (struct rx_call*)0;
1418 call->flags &= (~RX_CALL_WAIT_PROC);
1419 MUTEX_ENTER(&rx_stats_mutex);
1421 MUTEX_EXIT(&rx_stats_mutex);
1422 rxi_ServerThreadSelectingCall = 0;
1423 CV_SIGNAL(&rx_serverPool_cv);
1424 MUTEX_EXIT(&rx_serverPool_lock);
1428 /* If there are no eligible incoming calls, add this process
1429 * to the idle server queue, to wait for one */
1433 *socketp = OSI_NULLSOCKET;
1435 sq->socketp = socketp;
1436 queue_Append(&rx_idleServerQueue, sq);
1437 #ifndef AFS_AIX41_ENV
1438 rx_waitForPacket = sq;
1439 #endif /* AFS_AIX41_ENV */
1441 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1443 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1444 MUTEX_EXIT(&rx_serverPool_lock);
1445 return (struct rx_call *)0;
1448 } while (!(call = sq->newcall) &&
1449 !(socketp && *socketp != OSI_NULLSOCKET));
1450 MUTEX_EXIT(&rx_serverPool_lock);
1452 MUTEX_ENTER(&call->lock);
1458 MUTEX_ENTER(&freeSQEList_lock);
1459 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1460 rx_FreeSQEList = sq;
1461 MUTEX_EXIT(&freeSQEList_lock);
1464 clock_GetTime(&call->startTime);
1465 call->state = RX_STATE_ACTIVE;
1466 call->mode = RX_MODE_RECEIVING;
1468 rxi_calltrace(RX_CALL_START, call);
1469 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1470 call->conn->service->servicePort,
1471 call->conn->service->serviceId, call));
1473 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1474 MUTEX_EXIT(&call->lock);
1476 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1481 #else /* RX_ENABLE_LOCKS */
1483 rx_GetCall(tno, cur_service, socketp)
1485 struct rx_service *cur_service;
1486 osi_socket *socketp;
1488 struct rx_serverQueueEntry *sq;
1489 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1490 struct rx_service *service;
1495 MUTEX_ENTER(&freeSQEList_lock);
1497 if (sq = rx_FreeSQEList) {
1498 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1499 MUTEX_EXIT(&freeSQEList_lock);
1500 } else { /* otherwise allocate a new one and return that */
1501 MUTEX_EXIT(&freeSQEList_lock);
1502 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1503 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1504 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1506 MUTEX_ENTER(&sq->lock);
1508 if (cur_service != NULL) {
1509 cur_service->nRequestsRunning--;
1510 if (cur_service->nRequestsRunning < cur_service->minProcs)
1514 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1515 register struct rx_call *tcall, *ncall;
1516 /* Scan for eligible incoming calls. A call is not eligible
1517 * if the maximum number of calls for its service type are
1518 * already executing */
1519 /* One thread will process calls FCFS (to prevent starvation),
1520 * while the other threads may run ahead looking for calls which
1521 * have all their input data available immediately. This helps
1522 * keep threads from blocking, waiting for data from the client. */
1523 choice2 = (struct rx_call *) 0;
1524 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1525 service = tcall->conn->service;
1526 if (QuotaOK(service)) {
1527 if (!tno || !tcall->queue_item_header.next ) {
1528 /* If we're thread 0, then we'll just use
1529 * this call. If we haven't been able to find an optimal
1530 * choice, and we're at the end of the list, then use a
1531 * 2d choice if one has been identified. Otherwise... */
1532 call = (choice2 ? choice2 : tcall);
1533 service = call->conn->service;
1534 } else if (!queue_IsEmpty(&tcall->rq)) {
1535 struct rx_packet *rp;
1536 rp = queue_First(&tcall->rq, rx_packet);
1537 if (rp->header.seq == 1
1538 && (!meltdown_1pkt ||
1539 (rp->header.flags & RX_LAST_PACKET))) {
1541 } else if (rxi_2dchoice && !choice2 &&
1542 !(tcall->flags & RX_CALL_CLEARED) &&
1543 (tcall->rprev > rxi_HardAckRate)) {
1545 } else rxi_md2cnt++;
1555 /* we can't schedule a call if there's no data!!! */
1556 /* send an ack if there's no data, if we're missing the
1557 * first packet, or we're missing something between first
1558 * and last -- there's a "hole" in the incoming data. */
1559 if (queue_IsEmpty(&call->rq) ||
1560 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1561 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1562 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1564 call->flags &= (~RX_CALL_WAIT_PROC);
1565 service->nRequestsRunning++;
1566 /* just started call in minProcs pool, need fewer to maintain
1568 if (service->nRequestsRunning <= service->minProcs)
1572 /* MUTEX_EXIT(&call->lock); */
1575 /* If there are no eligible incoming calls, add this process
1576 * to the idle server queue, to wait for one */
1579 *socketp = OSI_NULLSOCKET;
1581 sq->socketp = socketp;
1582 queue_Append(&rx_idleServerQueue, sq);
1586 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1589 return (struct rx_call *)0;
1592 } while (!(call = sq->newcall) &&
1593 !(socketp && *socketp != OSI_NULLSOCKET));
1595 MUTEX_EXIT(&sq->lock);
1597 MUTEX_ENTER(&freeSQEList_lock);
1598 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1599 rx_FreeSQEList = sq;
1600 MUTEX_EXIT(&freeSQEList_lock);
1603 clock_GetTime(&call->startTime);
1604 call->state = RX_STATE_ACTIVE;
1605 call->mode = RX_MODE_RECEIVING;
1607 rxi_calltrace(RX_CALL_START, call);
1608 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1609 call->conn->service->servicePort,
1610 call->conn->service->serviceId, call));
1612 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1620 #endif /* RX_ENABLE_LOCKS */
1624 /* Establish a procedure to be called when a packet arrives for a
1625 * call. This routine will be called at most once after each call,
1626 * and will also be called if there is an error condition on the or
1627 * the call is complete. Used by multi rx to build a selection
1628 * function which determines which of several calls is likely to be a
1629 * good one to read from.
1630 * NOTE: the way this is currently implemented it is probably only a
1631 * good idea to (1) use it immediately after a newcall (clients only)
1632 * and (2) only use it once. Other uses currently void your warranty
1634 void rx_SetArrivalProc(call, proc, handle, arg)
1635 register struct rx_call *call;
1636 register VOID (*proc)();
1637 register VOID *handle;
1640 call->arrivalProc = proc;
1641 call->arrivalProcHandle = handle;
1642 call->arrivalProcArg = arg;
1645 /* Call is finished (possibly prematurely). Return rc to the peer, if
1646 * appropriate, and return the final error code from the conversation
1649 afs_int32 rx_EndCall(call, rc)
1650 register struct rx_call *call;
1653 register struct rx_connection *conn = call->conn;
1654 register struct rx_service *service;
1655 register struct rx_packet *tp; /* Temporary packet pointer */
1656 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1660 dpf(("rx_EndCall(call %x)\n", call));
1664 MUTEX_ENTER(&call->lock);
1666 if (rc == 0 && call->error == 0) {
1667 call->abortCode = 0;
1668 call->abortCount = 0;
1671 call->arrivalProc = (VOID (*)()) 0;
1672 if (rc && call->error == 0) {
1673 rxi_CallError(call, rc);
1674 /* Send an abort message to the peer if this error code has
1675 * only just been set. If it was set previously, assume the
1676 * peer has already been sent the error code or will request it
1678 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1680 if (conn->type == RX_SERVER_CONNECTION) {
1681 /* Make sure reply or at least dummy reply is sent */
1682 if (call->mode == RX_MODE_RECEIVING) {
1683 rxi_WriteProc(call, 0, 0);
1685 if (call->mode == RX_MODE_SENDING) {
1686 rxi_FlushWrite(call);
1688 service = conn->service;
1689 rxi_calltrace(RX_CALL_END, call);
1690 /* Call goes to hold state until reply packets are acknowledged */
1691 if (call->tfirst + call->nSoftAcked < call->tnext) {
1692 call->state = RX_STATE_HOLD;
1694 call->state = RX_STATE_DALLY;
1695 rxi_ClearTransmitQueue(call, 0);
1696 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1697 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1700 else { /* Client connection */
1702 /* Make sure server receives input packets, in the case where
1703 * no reply arguments are expected */
1704 if ((call->mode == RX_MODE_SENDING)
1705 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1706 (void) rxi_ReadProc(call, &dummy, 1);
1708 /* We need to release the call lock since it's lower than the
1709 * conn_call_lock and we don't want to hold the conn_call_lock
1710 * over the rx_ReadProc call. The conn_call_lock needs to be held
1711 * here for the case where rx_NewCall is perusing the calls on
1712 * the connection structure. We don't want to signal until
1713 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1714 * have checked this call, found it active and by the time it
1715 * goes to sleep, will have missed the signal.
1717 MUTEX_EXIT(&call->lock);
1718 MUTEX_ENTER(&conn->conn_call_lock);
1719 MUTEX_ENTER(&call->lock);
1720 MUTEX_ENTER(&conn->conn_data_lock);
1721 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1722 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1723 MUTEX_EXIT(&conn->conn_data_lock);
1724 #ifdef RX_ENABLE_LOCKS
1725 CV_BROADCAST(&conn->conn_call_cv);
1730 #ifdef RX_ENABLE_LOCKS
1732 MUTEX_EXIT(&conn->conn_data_lock);
1734 #endif /* RX_ENABLE_LOCKS */
1735 call->state = RX_STATE_DALLY;
1737 error = call->error;
1739 /* currentPacket, nLeft, and NFree must be zeroed here, because
1740 * ResetCall cannot: ResetCall may be called at splnet(), in the
1741 * kernel version, and may interrupt the macros rx_Read or
1742 * rx_Write, which run at normal priority for efficiency. */
1743 if (call->currentPacket) {
1744 rxi_FreePacket(call->currentPacket);
1745 call->currentPacket = (struct rx_packet *) 0;
1746 call->nLeft = call->nFree = call->curlen = 0;
1749 call->nLeft = call->nFree = call->curlen = 0;
1751 /* Free any packets from the last call to ReadvProc/WritevProc */
1752 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1757 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1758 MUTEX_EXIT(&call->lock);
1759 if (conn->type == RX_CLIENT_CONNECTION)
1760 MUTEX_EXIT(&conn->conn_call_lock);
1764 * Map errors to the local host's errno.h format.
1766 error = ntoh_syserr_conv(error);
1770 #if !defined(KERNEL)
1772 /* Call this routine when shutting down a server or client (especially
1773 * clients). This will allow Rx to gracefully garbage collect server
1774 * connections, and reduce the number of retries that a server might
1775 * make to a dead client.
1776 * This is not quite right, since some calls may still be ongoing and
1777 * we can't lock them to destroy them. */
1778 void rx_Finalize() {
1779 register struct rx_connection **conn_ptr, **conn_end;
1783 if (rxinit_status == 1) {
1785 return; /* Already shutdown. */
1787 rxi_DeleteCachedConnections();
1788 if (rx_connHashTable) {
1789 MUTEX_ENTER(&rx_connHashTable_lock);
1790 for (conn_ptr = &rx_connHashTable[0],
1791 conn_end = &rx_connHashTable[rx_hashTableSize];
1792 conn_ptr < conn_end; conn_ptr++) {
1793 struct rx_connection *conn, *next;
1794 for (conn = *conn_ptr; conn; conn = next) {
1796 if (conn->type == RX_CLIENT_CONNECTION) {
1797 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1799 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1800 #ifdef RX_ENABLE_LOCKS
1801 rxi_DestroyConnectionNoLock(conn);
1802 #else /* RX_ENABLE_LOCKS */
1803 rxi_DestroyConnection(conn);
1804 #endif /* RX_ENABLE_LOCKS */
1808 #ifdef RX_ENABLE_LOCKS
1809 while (rx_connCleanup_list) {
1810 struct rx_connection *conn;
1811 conn = rx_connCleanup_list;
1812 rx_connCleanup_list = rx_connCleanup_list->next;
1813 MUTEX_EXIT(&rx_connHashTable_lock);
1814 rxi_CleanupConnection(conn);
1815 MUTEX_ENTER(&rx_connHashTable_lock);
1817 MUTEX_EXIT(&rx_connHashTable_lock);
1818 #endif /* RX_ENABLE_LOCKS */
1827 /* if we wakeup packet waiter too often, can get in loop with two
1828 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1830 rxi_PacketsUnWait() {
1832 if (!rx_waitingForPackets) {
1836 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1837 return; /* still over quota */
1840 rx_waitingForPackets = 0;
1841 #ifdef RX_ENABLE_LOCKS
1842 CV_BROADCAST(&rx_waitingForPackets_cv);
1844 osi_rxWakeup(&rx_waitingForPackets);
1850 /* ------------------Internal interfaces------------------------- */
1852 /* Return this process's service structure for the
1853 * specified socket and service */
1854 struct rx_service *rxi_FindService(socket, serviceId)
1855 register osi_socket socket;
1856 register u_short serviceId;
1858 register struct rx_service **sp;
1859 for (sp = &rx_services[0]; *sp; sp++) {
1860 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1866 /* Allocate a call structure, for the indicated channel of the
1867 * supplied connection. The mode and state of the call must be set by
1869 struct rx_call *rxi_NewCall(conn, channel)
1870 register struct rx_connection *conn;
1871 register int channel;
1873 register struct rx_call *call;
1874 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1875 register struct rx_call *cp; /* Call pointer temp */
1876 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1877 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1879 /* Grab an existing call structure, or allocate a new one.
1880 * Existing call structures are assumed to have been left reset by
1882 MUTEX_ENTER(&rx_freeCallQueue_lock);
1884 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1886 * EXCEPT that the TQ might not yet be cleared out.
1887 * Skip over those with in-use TQs.
1890 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1891 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1897 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1898 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1899 call = queue_First(&rx_freeCallQueue, rx_call);
1900 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1902 MUTEX_ENTER(&rx_stats_mutex);
1903 rx_stats.nFreeCallStructs--;
1904 MUTEX_EXIT(&rx_stats_mutex);
1905 MUTEX_EXIT(&rx_freeCallQueue_lock);
1906 MUTEX_ENTER(&call->lock);
1907 CLEAR_CALL_QUEUE_LOCK(call);
1908 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1909 /* Now, if TQ wasn't cleared earlier, do it now. */
1910 if (call->flags & RX_CALL_TQ_CLEARME) {
1911 rxi_ClearTransmitQueue(call, 0);
1912 queue_Init(&call->tq);
1914 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1915 /* Bind the call to its connection structure */
1917 rxi_ResetCall(call, 1);
1920 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1922 MUTEX_EXIT(&rx_freeCallQueue_lock);
1923 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1924 MUTEX_ENTER(&call->lock);
1925 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1926 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1927 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1929 MUTEX_ENTER(&rx_stats_mutex);
1930 rx_stats.nCallStructs++;
1931 MUTEX_EXIT(&rx_stats_mutex);
1932 /* Initialize once-only items */
1933 queue_Init(&call->tq);
1934 queue_Init(&call->rq);
1935 queue_Init(&call->iovq);
1936 /* Bind the call to its connection structure (prereq for reset) */
1938 rxi_ResetCall(call, 1);
1940 call->channel = channel;
1941 call->callNumber = &conn->callNumber[channel];
1942 /* Note that the next expected call number is retained (in
1943 * conn->callNumber[i]), even if we reallocate the call structure
1945 conn->call[channel] = call;
1946 /* if the channel's never been used (== 0), we should start at 1, otherwise
1947 the call number is valid from the last time this channel was used */
1948 if (*call->callNumber == 0) *call->callNumber = 1;
1950 MUTEX_EXIT(&call->lock);
1954 /* A call has been inactive long enough that so we can throw away
1955 * state, including the call structure, which is placed on the call
1957 * Call is locked upon entry.
1959 #ifdef RX_ENABLE_LOCKS
1960 void rxi_FreeCall(call, haveCTLock)
1961 int haveCTLock; /* Set if called from rxi_ReapConnections */
1962 #else /* RX_ENABLE_LOCKS */
1963 void rxi_FreeCall(call)
1964 #endif /* RX_ENABLE_LOCKS */
1965 register struct rx_call *call;
1967 register int channel = call->channel;
1968 register struct rx_connection *conn = call->conn;
1971 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
1972 (*call->callNumber)++;
1973 rxi_ResetCall(call, 0);
1974 call->conn->call[channel] = (struct rx_call *) 0;
1976 MUTEX_ENTER(&rx_freeCallQueue_lock);
1977 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
1978 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1979 /* A call may be free even though its transmit queue is still in use.
1980 * Since we search the call list from head to tail, put busy calls at
1981 * the head of the list, and idle calls at the tail.
1983 if (call->flags & RX_CALL_TQ_BUSY)
1984 queue_Prepend(&rx_freeCallQueue, call);
1986 queue_Append(&rx_freeCallQueue, call);
1987 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1988 queue_Append(&rx_freeCallQueue, call);
1989 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1990 MUTEX_ENTER(&rx_stats_mutex);
1991 rx_stats.nFreeCallStructs++;
1992 MUTEX_EXIT(&rx_stats_mutex);
1994 MUTEX_EXIT(&rx_freeCallQueue_lock);
1996 /* Destroy the connection if it was previously slated for
1997 * destruction, i.e. the Rx client code previously called
1998 * rx_DestroyConnection (client connections), or
1999 * rxi_ReapConnections called the same routine (server
2000 * connections). Only do this, however, if there are no
2001 * outstanding calls. Note that for fine grain locking, there appears
2002 * to be a deadlock in that rxi_FreeCall has a call locked and
2003 * DestroyConnectionNoLock locks each call in the conn. But note a
2004 * few lines up where we have removed this call from the conn.
2005 * If someone else destroys a connection, they either have no
2006 * call lock held or are going through this section of code.
2008 if (conn->flags & RX_CONN_DESTROY_ME) {
2009 MUTEX_ENTER(&conn->conn_data_lock);
2011 MUTEX_EXIT(&conn->conn_data_lock);
2012 #ifdef RX_ENABLE_LOCKS
2014 rxi_DestroyConnectionNoLock(conn);
2016 rxi_DestroyConnection(conn);
2017 #else /* RX_ENABLE_LOCKS */
2018 rxi_DestroyConnection(conn);
2019 #endif /* RX_ENABLE_LOCKS */
2023 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2024 char *rxi_Alloc(size)
2025 register size_t size;
2029 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2030 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2033 int glockOwner = ISAFS_GLOCK();
2037 MUTEX_ENTER(&rx_stats_mutex);
2038 rxi_Alloccnt++; rxi_Allocsize += size;
2039 MUTEX_EXIT(&rx_stats_mutex);
2040 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2041 if (size > AFS_SMALLOCSIZ) {
2042 p = (char *) osi_AllocMediumSpace(size);
2044 p = (char *) osi_AllocSmall(size, 1);
2045 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2050 p = (char *) osi_Alloc(size);
2052 if (!p) osi_Panic("rxi_Alloc error");
2057 void rxi_Free(addr, size)
2059 register size_t size;
2061 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2062 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2065 int glockOwner = ISAFS_GLOCK();
2069 MUTEX_ENTER(&rx_stats_mutex);
2070 rxi_Alloccnt--; rxi_Allocsize -= size;
2071 MUTEX_EXIT(&rx_stats_mutex);
2072 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2073 if (size > AFS_SMALLOCSIZ)
2074 osi_FreeMediumSpace(addr);
2076 osi_FreeSmall(addr);
2077 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2082 osi_Free(addr, size);
2086 /* Find the peer process represented by the supplied (host,port)
2087 * combination. If there is no appropriate active peer structure, a
2088 * new one will be allocated and initialized
2089 * The origPeer, if set, is a pointer to a peer structure on which the
2090 * refcount will be be decremented. This is used to replace the peer
2091 * structure hanging off a connection structure */
2092 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2093 register afs_uint32 host;
2094 register u_short port;
2095 struct rx_peer *origPeer;
2098 register struct rx_peer *pp;
2100 hashIndex = PEER_HASH(host, port);
2101 MUTEX_ENTER(&rx_peerHashTable_lock);
2102 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2103 if ((pp->host == host) && (pp->port == port)) break;
2107 pp = rxi_AllocPeer(); /* This bzero's *pp */
2108 pp->host = host; /* set here or in InitPeerParams is zero */
2110 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2111 queue_Init(&pp->congestionQueue);
2112 queue_Init(&pp->rpcStats);
2113 pp->next = rx_peerHashTable[hashIndex];
2114 rx_peerHashTable[hashIndex] = pp;
2115 rxi_InitPeerParams(pp);
2116 MUTEX_ENTER(&rx_stats_mutex);
2117 rx_stats.nPeerStructs++;
2118 MUTEX_EXIT(&rx_stats_mutex);
2125 origPeer->refCount--;
2126 MUTEX_EXIT(&rx_peerHashTable_lock);
2131 /* Find the connection at (host, port) started at epoch, and with the
2132 * given connection id. Creates the server connection if necessary.
2133 * The type specifies whether a client connection or a server
2134 * connection is desired. In both cases, (host, port) specify the
2135 * peer's (host, pair) pair. Client connections are not made
2136 * automatically by this routine. The parameter socket gives the
2137 * socket descriptor on which the packet was received. This is used,
2138 * in the case of server connections, to check that *new* connections
2139 * come via a valid (port, serviceId). Finally, the securityIndex
2140 * parameter must match the existing index for the connection. If a
2141 * server connection is created, it will be created using the supplied
2142 * index, if the index is valid for this service */
2143 struct rx_connection *
2144 rxi_FindConnection(socket, host, port, serviceId, cid,
2145 epoch, type, securityIndex)
2147 register afs_int32 host;
2148 register u_short port;
2153 u_int securityIndex;
2155 int hashindex, flag;
2156 register struct rx_connection *conn;
2157 struct rx_peer *peer;
2158 hashindex = CONN_HASH(host, port, cid, epoch, type);
2159 MUTEX_ENTER(&rx_connHashTable_lock);
2160 rxLastConn ? (conn = rxLastConn, flag = 0) :
2161 (conn = rx_connHashTable[hashindex], flag = 1);
2163 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2164 && (epoch == conn->epoch)) {
2165 register struct rx_peer *pp = conn->peer;
2166 if (securityIndex != conn->securityIndex) {
2167 /* this isn't supposed to happen, but someone could forge a packet
2168 like this, and there seems to be some CM bug that makes this
2169 happen from time to time -- in which case, the fileserver
2171 MUTEX_EXIT(&rx_connHashTable_lock);
2172 return (struct rx_connection *) 0;
2174 /* epoch's high order bits mean route for security reasons only on
2175 * the cid, not the host and port fields.
2177 if (conn->epoch & 0x80000000) break;
2178 if (((type == RX_CLIENT_CONNECTION)
2179 || (pp->host == host)) && (pp->port == port))
2184 /* the connection rxLastConn that was used the last time is not the
2185 ** one we are looking for now. Hence, start searching in the hash */
2187 conn = rx_connHashTable[hashindex];
2193 struct rx_service *service;
2194 if (type == RX_CLIENT_CONNECTION) {
2195 MUTEX_EXIT(&rx_connHashTable_lock);
2196 return (struct rx_connection *) 0;
2198 service = rxi_FindService(socket, serviceId);
2199 if (!service || (securityIndex >= service->nSecurityObjects)
2200 || (service->securityObjects[securityIndex] == 0)) {
2201 MUTEX_EXIT(&rx_connHashTable_lock);
2202 return (struct rx_connection *) 0;
2204 conn = rxi_AllocConnection(); /* This bzero's the connection */
2205 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2207 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2209 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2210 conn->next = rx_connHashTable[hashindex];
2211 rx_connHashTable[hashindex] = conn;
2212 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2213 conn->type = RX_SERVER_CONNECTION;
2214 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2215 conn->epoch = epoch;
2216 conn->cid = cid & RX_CIDMASK;
2217 /* conn->serial = conn->lastSerial = 0; */
2218 /* conn->timeout = 0; */
2219 conn->ackRate = RX_FAST_ACK_RATE;
2220 conn->service = service;
2221 conn->serviceId = serviceId;
2222 conn->securityIndex = securityIndex;
2223 conn->securityObject = service->securityObjects[securityIndex];
2224 conn->nSpecific = 0;
2225 conn->specific = NULL;
2226 rx_SetConnDeadTime(conn, service->connDeadTime);
2227 /* Notify security object of the new connection */
2228 RXS_NewConnection(conn->securityObject, conn);
2229 /* XXXX Connection timeout? */
2230 if (service->newConnProc) (*service->newConnProc)(conn);
2231 MUTEX_ENTER(&rx_stats_mutex);
2232 rx_stats.nServerConns++;
2233 MUTEX_EXIT(&rx_stats_mutex);
2237 /* Ensure that the peer structure is set up in such a way that
2238 ** replies in this connection go back to that remote interface
2239 ** from which the last packet was sent out. In case, this packet's
2240 ** source IP address does not match the peer struct for this conn,
2241 ** then drop the refCount on conn->peer and get a new peer structure.
2242 ** We can check the host,port field in the peer structure without the
2243 ** rx_peerHashTable_lock because the peer structure has its refCount
2244 ** incremented and the only time the host,port in the peer struct gets
2245 ** updated is when the peer structure is created.
2247 if (conn->peer->host == host )
2248 peer = conn->peer; /* no change to the peer structure */
2250 peer = rxi_FindPeer(host, port, conn->peer, 1);
2253 MUTEX_ENTER(&conn->conn_data_lock);
2256 MUTEX_EXIT(&conn->conn_data_lock);
2258 rxLastConn = conn; /* store this connection as the last conn used */
2259 MUTEX_EXIT(&rx_connHashTable_lock);
2263 /* There are two packet tracing routines available for testing and monitoring
2264 * Rx. One is called just after every packet is received and the other is
2265 * called just before every packet is sent. Received packets, have had their
2266 * headers decoded, and packets to be sent have not yet had their headers
2267 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2268 * containing the network address. Both can be modified. The return value, if
2269 * non-zero, indicates that the packet should be dropped. */
2271 int (*rx_justReceived)() = 0;
2272 int (*rx_almostSent)() = 0;
2274 /* A packet has been received off the interface. Np is the packet, socket is
2275 * the socket number it was received from (useful in determining which service
2276 * this packet corresponds to), and (host, port) reflect the host,port of the
2277 * sender. This call returns the packet to the caller if it is finished with
2278 * it, rather than de-allocating it, just as a small performance hack */
2280 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2281 register struct rx_packet *np;
2286 struct rx_call **newcallp;
2288 register struct rx_call *call;
2289 register struct rx_connection *conn;
2291 afs_uint32 currentCallNumber;
2297 struct rx_packet *tnp;
2300 /* We don't print out the packet until now because (1) the time may not be
2301 * accurate enough until now in the lwp implementation (rx_Listener only gets
2302 * the time after the packet is read) and (2) from a protocol point of view,
2303 * this is the first time the packet has been seen */
2304 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2305 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2306 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2307 np->header.serial, packetType, host, port, np->header.serviceId,
2308 np->header.epoch, np->header.cid, np->header.callNumber,
2309 np->header.seq, np->header.flags, np));
2312 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2313 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2316 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2317 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2320 /* If an input tracer function is defined, call it with the packet and
2321 * network address. Note this function may modify its arguments. */
2322 if (rx_justReceived) {
2323 struct sockaddr_in addr;
2325 addr.sin_family = AF_INET;
2326 addr.sin_port = port;
2327 addr.sin_addr.s_addr = host;
2328 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2329 addr.sin_len = sizeof(addr);
2330 #endif /* AFS_OSF_ENV */
2331 drop = (*rx_justReceived) (np, &addr);
2332 /* drop packet if return value is non-zero */
2333 if (drop) return np;
2334 port = addr.sin_port; /* in case fcn changed addr */
2335 host = addr.sin_addr.s_addr;
2339 /* If packet was not sent by the client, then *we* must be the client */
2340 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2341 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2343 /* Find the connection (or fabricate one, if we're the server & if
2344 * necessary) associated with this packet */
2345 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2346 np->header.cid, np->header.epoch, type,
2347 np->header.securityIndex);
2350 /* If no connection found or fabricated, just ignore the packet.
2351 * (An argument could be made for sending an abort packet for
2356 MUTEX_ENTER(&conn->conn_data_lock);
2357 if (conn->maxSerial < np->header.serial)
2358 conn->maxSerial = np->header.serial;
2359 MUTEX_EXIT(&conn->conn_data_lock);
2361 /* If the connection is in an error state, send an abort packet and ignore
2362 * the incoming packet */
2364 /* Don't respond to an abort packet--we don't want loops! */
2365 MUTEX_ENTER(&conn->conn_data_lock);
2366 if (np->header.type != RX_PACKET_TYPE_ABORT)
2367 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2369 MUTEX_EXIT(&conn->conn_data_lock);
2373 /* Check for connection-only requests (i.e. not call specific). */
2374 if (np->header.callNumber == 0) {
2375 switch (np->header.type) {
2376 case RX_PACKET_TYPE_ABORT:
2377 /* What if the supplied error is zero? */
2378 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2379 MUTEX_ENTER(&conn->conn_data_lock);
2381 MUTEX_EXIT(&conn->conn_data_lock);
2383 case RX_PACKET_TYPE_CHALLENGE:
2384 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2385 MUTEX_ENTER(&conn->conn_data_lock);
2387 MUTEX_EXIT(&conn->conn_data_lock);
2389 case RX_PACKET_TYPE_RESPONSE:
2390 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2391 MUTEX_ENTER(&conn->conn_data_lock);
2393 MUTEX_EXIT(&conn->conn_data_lock);
2395 case RX_PACKET_TYPE_PARAMS:
2396 case RX_PACKET_TYPE_PARAMS+1:
2397 case RX_PACKET_TYPE_PARAMS+2:
2398 /* ignore these packet types for now */
2399 MUTEX_ENTER(&conn->conn_data_lock);
2401 MUTEX_EXIT(&conn->conn_data_lock);
2406 /* Should not reach here, unless the peer is broken: send an
2408 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2409 MUTEX_ENTER(&conn->conn_data_lock);
2410 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2412 MUTEX_EXIT(&conn->conn_data_lock);
2417 channel = np->header.cid & RX_CHANNELMASK;
2418 call = conn->call[channel];
2419 #ifdef RX_ENABLE_LOCKS
2421 MUTEX_ENTER(&call->lock);
2422 /* Test to see if call struct is still attached to conn. */
2423 if (call != conn->call[channel]) {
2425 MUTEX_EXIT(&call->lock);
2426 if (type == RX_SERVER_CONNECTION) {
2427 call = conn->call[channel];
2428 /* If we started with no call attached and there is one now,
2429 * another thread is also running this routine and has gotten
2430 * the connection channel. We should drop this packet in the tests
2431 * below. If there was a call on this connection and it's now
2432 * gone, then we'll be making a new call below.
2433 * If there was previously a call and it's now different then
2434 * the old call was freed and another thread running this routine
2435 * has created a call on this channel. One of these two threads
2436 * has a packet for the old call and the code below handles those
2440 MUTEX_ENTER(&call->lock);
2443 /* This packet can't be for this call. If the new call address is
2444 * 0 then no call is running on this channel. If there is a call
2445 * then, since this is a client connection we're getting data for
2446 * it must be for the previous call.
2448 MUTEX_ENTER(&rx_stats_mutex);
2449 rx_stats.spuriousPacketsRead++;
2450 MUTEX_EXIT(&rx_stats_mutex);
2451 MUTEX_ENTER(&conn->conn_data_lock);
2453 MUTEX_EXIT(&conn->conn_data_lock);
2458 currentCallNumber = conn->callNumber[channel];
2460 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2461 if (np->header.callNumber < currentCallNumber) {
2462 MUTEX_ENTER(&rx_stats_mutex);
2463 rx_stats.spuriousPacketsRead++;
2464 MUTEX_EXIT(&rx_stats_mutex);
2465 #ifdef RX_ENABLE_LOCKS
2467 MUTEX_EXIT(&call->lock);
2469 MUTEX_ENTER(&conn->conn_data_lock);
2471 MUTEX_EXIT(&conn->conn_data_lock);
2475 call = rxi_NewCall(conn, channel);
2476 MUTEX_ENTER(&call->lock);
2477 *call->callNumber = np->header.callNumber;
2478 call->state = RX_STATE_PRECALL;
2479 clock_GetTime(&call->queueTime);
2480 hzero(call->bytesSent);
2481 hzero(call->bytesRcvd);
2482 rxi_KeepAliveOn(call);
2484 else if (np->header.callNumber != currentCallNumber) {
2485 /* Wait until the transmit queue is idle before deciding
2486 * whether to reset the current call. Chances are that the
2487 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2490 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2491 while ((call->state == RX_STATE_ACTIVE) &&
2492 (call->flags & RX_CALL_TQ_BUSY)) {
2493 call->flags |= RX_CALL_TQ_WAIT;
2494 #ifdef RX_ENABLE_LOCKS
2495 CV_WAIT(&call->cv_tq, &call->lock);
2496 #else /* RX_ENABLE_LOCKS */
2497 osi_rxSleep(&call->tq);
2498 #endif /* RX_ENABLE_LOCKS */
2500 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2501 /* If the new call cannot be taken right now send a busy and set
2502 * the error condition in this call, so that it terminates as
2503 * quickly as possible */
2504 if (call->state == RX_STATE_ACTIVE) {
2505 struct rx_packet *tp;
2507 rxi_CallError(call, RX_CALL_DEAD);
2508 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2509 MUTEX_EXIT(&call->lock);
2510 MUTEX_ENTER(&conn->conn_data_lock);
2512 MUTEX_EXIT(&conn->conn_data_lock);
2515 rxi_ResetCall(call, 0);
2516 *call->callNumber = np->header.callNumber;
2517 call->state = RX_STATE_PRECALL;
2518 clock_GetTime(&call->queueTime);
2519 hzero(call->bytesSent);
2520 hzero(call->bytesRcvd);
2522 * If the number of queued calls exceeds the overload
2523 * threshold then abort this call.
2525 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2526 struct rx_packet *tp;
2528 rxi_CallError(call, rx_BusyError);
2529 tp = rxi_SendCallAbort(call, np, 1, 0);
2530 MUTEX_EXIT(&call->lock);
2531 MUTEX_ENTER(&conn->conn_data_lock);
2533 MUTEX_EXIT(&conn->conn_data_lock);
2536 rxi_KeepAliveOn(call);
2539 /* Continuing call; do nothing here. */
2541 } else { /* we're the client */
2542 /* Ignore all incoming acknowledgements for calls in DALLY state */
2543 if ( call && (call->state == RX_STATE_DALLY)
2544 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2545 MUTEX_ENTER(&rx_stats_mutex);
2546 rx_stats.ignorePacketDally++;
2547 MUTEX_EXIT(&rx_stats_mutex);
2548 #ifdef RX_ENABLE_LOCKS
2550 MUTEX_EXIT(&call->lock);
2553 MUTEX_ENTER(&conn->conn_data_lock);
2555 MUTEX_EXIT(&conn->conn_data_lock);
2559 /* Ignore anything that's not relevant to the current call. If there
2560 * isn't a current call, then no packet is relevant. */
2561 if (!call || (np->header.callNumber != currentCallNumber)) {
2562 MUTEX_ENTER(&rx_stats_mutex);
2563 rx_stats.spuriousPacketsRead++;
2564 MUTEX_EXIT(&rx_stats_mutex);
2565 #ifdef RX_ENABLE_LOCKS
2567 MUTEX_EXIT(&call->lock);
2570 MUTEX_ENTER(&conn->conn_data_lock);
2572 MUTEX_EXIT(&conn->conn_data_lock);
2575 /* If the service security object index stamped in the packet does not
2576 * match the connection's security index, ignore the packet */
2577 if (np->header.securityIndex != conn->securityIndex) {
2578 #ifdef RX_ENABLE_LOCKS
2579 MUTEX_EXIT(&call->lock);
2581 MUTEX_ENTER(&conn->conn_data_lock);
2583 MUTEX_EXIT(&conn->conn_data_lock);
2587 /* If we're receiving the response, then all transmit packets are
2588 * implicitly acknowledged. Get rid of them. */
2589 if (np->header.type == RX_PACKET_TYPE_DATA) {
2590 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2591 /* XXX Hack. Because we must release the global rx lock when
2592 * sending packets (osi_NetSend) we drop all acks while we're
2593 * traversing the tq in rxi_Start sending packets out because
2594 * packets may move to the freePacketQueue as result of being here!
2595 * So we drop these packets until we're safely out of the
2596 * traversing. Really ugly!
2597 * For fine grain RX locking, we set the acked field in the
2598 * packets and let rxi_Start remove them from the transmit queue.
2600 if (call->flags & RX_CALL_TQ_BUSY) {
2601 #ifdef RX_ENABLE_LOCKS
2602 rxi_SetAcksInTransmitQueue(call);
2605 return np; /* xmitting; drop packet */
2609 rxi_ClearTransmitQueue(call, 0);
2611 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2612 rxi_ClearTransmitQueue(call, 0);
2613 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2615 if (np->header.type == RX_PACKET_TYPE_ACK) {
2616 /* now check to see if this is an ack packet acknowledging that the
2617 * server actually *lost* some hard-acked data. If this happens we
2618 * ignore this packet, as it may indicate that the server restarted in
2619 * the middle of a call. It is also possible that this is an old ack
2620 * packet. We don't abort the connection in this case, because this
2621 * *might* just be an old ack packet. The right way to detect a server
2622 * restart in the midst of a call is to notice that the server epoch
2624 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2625 * XXX unacknowledged. I think that this is off-by-one, but
2626 * XXX I don't dare change it just yet, since it will
2627 * XXX interact badly with the server-restart detection
2628 * XXX code in receiveackpacket. */
2629 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2630 MUTEX_ENTER(&rx_stats_mutex);
2631 rx_stats.spuriousPacketsRead++;
2632 MUTEX_EXIT(&rx_stats_mutex);
2633 MUTEX_EXIT(&call->lock);
2634 MUTEX_ENTER(&conn->conn_data_lock);
2636 MUTEX_EXIT(&conn->conn_data_lock);
2640 } /* else not a data packet */
2643 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2644 /* Set remote user defined status from packet */
2645 call->remoteStatus = np->header.userStatus;
2647 /* Note the gap between the expected next packet and the actual
2648 * packet that arrived, when the new packet has a smaller serial number
2649 * than expected. Rioses frequently reorder packets all by themselves,
2650 * so this will be quite important with very large window sizes.
2651 * Skew is checked against 0 here to avoid any dependence on the type of
2652 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2654 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2655 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2656 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2658 MUTEX_ENTER(&conn->conn_data_lock);
2659 skew = conn->lastSerial - np->header.serial;
2660 conn->lastSerial = np->header.serial;
2661 MUTEX_EXIT(&conn->conn_data_lock);
2663 register struct rx_peer *peer;
2665 if (skew > peer->inPacketSkew) {
2666 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2667 peer->inPacketSkew = skew;
2671 /* Now do packet type-specific processing */
2672 switch (np->header.type) {
2673 case RX_PACKET_TYPE_DATA:
2674 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2677 case RX_PACKET_TYPE_ACK:
2678 /* Respond immediately to ack packets requesting acknowledgement
2680 if (np->header.flags & RX_REQUEST_ACK) {
2681 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2682 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2684 np = rxi_ReceiveAckPacket(call, np, 1);
2686 case RX_PACKET_TYPE_ABORT:
2687 /* An abort packet: reset the connection, passing the error up to
2689 /* What if error is zero? */
2690 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2692 case RX_PACKET_TYPE_BUSY:
2695 case RX_PACKET_TYPE_ACKALL:
2696 /* All packets acknowledged, so we can drop all packets previously
2697 * readied for sending */
2698 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2699 /* XXX Hack. We because we can't release the global rx lock when
2700 * sending packets (osi_NetSend) we drop all ack pkts while we're
2701 * traversing the tq in rxi_Start sending packets out because
2702 * packets may move to the freePacketQueue as result of being
2703 * here! So we drop these packets until we're safely out of the
2704 * traversing. Really ugly!
2705 * For fine grain RX locking, we set the acked field in the packets
2706 * and let rxi_Start remove the packets from the transmit queue.
2708 if (call->flags & RX_CALL_TQ_BUSY) {
2709 #ifdef RX_ENABLE_LOCKS
2710 rxi_SetAcksInTransmitQueue(call);
2712 #else /* RX_ENABLE_LOCKS */
2714 return np; /* xmitting; drop packet */
2715 #endif /* RX_ENABLE_LOCKS */
2717 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2718 rxi_ClearTransmitQueue(call, 0);
2721 /* Should not reach here, unless the peer is broken: send an abort
2723 rxi_CallError(call, RX_PROTOCOL_ERROR);
2724 np = rxi_SendCallAbort(call, np, 1, 0);
2727 /* Note when this last legitimate packet was received, for keep-alive
2728 * processing. Note, we delay getting the time until now in the hope that
2729 * the packet will be delivered to the user before any get time is required
2730 * (if not, then the time won't actually be re-evaluated here). */
2731 call->lastReceiveTime = clock_Sec();
2732 MUTEX_EXIT(&call->lock);
2733 MUTEX_ENTER(&conn->conn_data_lock);
2735 MUTEX_EXIT(&conn->conn_data_lock);
2739 /* return true if this is an "interesting" connection from the point of view
2740 of someone trying to debug the system */
2741 int rxi_IsConnInteresting(struct rx_connection *aconn)
2744 register struct rx_call *tcall;
2746 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2748 for(i=0;i<RX_MAXCALLS;i++) {
2749 tcall = aconn->call[i];
2751 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2753 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2761 /* if this is one of the last few packets AND it wouldn't be used by the
2762 receiving call to immediately satisfy a read request, then drop it on
2763 the floor, since accepting it might prevent a lock-holding thread from
2764 making progress in its reading. If a call has been cleared while in
2765 the precall state then ignore all subsequent packets until the call
2766 is assigned to a thread. */
2768 static TooLow(ap, acall)
2769 struct rx_call *acall;
2770 struct rx_packet *ap; {
2772 MUTEX_ENTER(&rx_stats_mutex);
2773 if (((ap->header.seq != 1) &&
2774 (acall->flags & RX_CALL_CLEARED) &&
2775 (acall->state == RX_STATE_PRECALL)) ||
2776 ((rx_nFreePackets < rxi_dataQuota+2) &&
2777 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2778 && (acall->flags & RX_CALL_READER_WAIT)))) {
2781 MUTEX_EXIT(&rx_stats_mutex);
2786 /* try to attach call, if authentication is complete */
2787 static void TryAttach(acall, socket, tnop, newcallp)
2788 register struct rx_call *acall;
2789 register osi_socket socket;
2791 register struct rx_call **newcallp; {
2792 register struct rx_connection *conn;
2794 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2795 /* Don't attach until we have any req'd. authentication. */
2796 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2797 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2798 /* Note: this does not necessarily succeed; there
2799 may not any proc available */
2802 rxi_ChallengeOn(acall->conn);
2807 /* A data packet has been received off the interface. This packet is
2808 * appropriate to the call (the call is in the right state, etc.). This
2809 * routine can return a packet to the caller, for re-use */
2811 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2812 port, tnop, newcallp)
2813 register struct rx_call *call;
2814 register struct rx_packet *np;
2820 struct rx_call **newcallp;
2826 afs_uint32 seq, serial, flags;
2828 struct rx_packet *tnp;
2830 MUTEX_ENTER(&rx_stats_mutex);
2831 rx_stats.dataPacketsRead++;
2832 MUTEX_EXIT(&rx_stats_mutex);
2835 /* If there are no packet buffers, drop this new packet, unless we can find
2836 * packet buffers from inactive calls */
2838 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2839 MUTEX_ENTER(&rx_freePktQ_lock);
2840 rxi_NeedMorePackets = TRUE;
2841 MUTEX_EXIT(&rx_freePktQ_lock);
2842 MUTEX_ENTER(&rx_stats_mutex);
2843 rx_stats.noPacketBuffersOnRead++;
2844 MUTEX_EXIT(&rx_stats_mutex);
2845 call->rprev = np->header.serial;
2846 rxi_calltrace(RX_TRACE_DROP, call);
2847 dpf (("packet %x dropped on receipt - quota problems", np));
2849 rxi_ClearReceiveQueue(call);
2850 clock_GetTime(&when);
2851 clock_Add(&when, &rx_softAckDelay);
2852 if (!call->delayedAckEvent ||
2853 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2854 rxevent_Cancel(call->delayedAckEvent, call,
2855 RX_CALL_REFCOUNT_DELAY);
2856 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2857 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2860 /* we've damaged this call already, might as well do it in. */
2866 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2867 * packet is one of several packets transmitted as a single
2868 * datagram. Do not send any soft or hard acks until all packets
2869 * in a jumbogram have been processed. Send negative acks right away.
2871 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2872 /* tnp is non-null when there are more packets in the
2873 * current jumbo gram */
2880 seq = np->header.seq;
2881 serial = np->header.serial;
2882 flags = np->header.flags;
2884 /* If the call is in an error state, send an abort message */
2886 return rxi_SendCallAbort(call, np, istack, 0);
2888 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2889 * AFS 3.5 jumbogram. */
2890 if (flags & RX_JUMBO_PACKET) {
2891 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2896 if (np->header.spare != 0) {
2897 MUTEX_ENTER(&call->conn->conn_data_lock);
2898 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2899 MUTEX_EXIT(&call->conn->conn_data_lock);
2902 /* The usual case is that this is the expected next packet */
2903 if (seq == call->rnext) {
2905 /* Check to make sure it is not a duplicate of one already queued */
2906 if (queue_IsNotEmpty(&call->rq)
2907 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2908 MUTEX_ENTER(&rx_stats_mutex);
2909 rx_stats.dupPacketsRead++;
2910 MUTEX_EXIT(&rx_stats_mutex);
2911 dpf (("packet %x dropped on receipt - duplicate", np));
2912 rxevent_Cancel(call->delayedAckEvent, call,
2913 RX_CALL_REFCOUNT_DELAY);
2914 np = rxi_SendAck(call, np, seq, serial,
2915 flags, RX_ACK_DUPLICATE, istack);
2921 /* It's the next packet. Stick it on the receive queue
2922 * for this call. Set newPackets to make sure we wake
2923 * the reader once all packets have been processed */
2924 queue_Prepend(&call->rq, np);
2926 np = NULL; /* We can't use this anymore */
2929 /* If an ack is requested then set a flag to make sure we
2930 * send an acknowledgement for this packet */
2931 if (flags & RX_REQUEST_ACK) {
2935 /* Keep track of whether we have received the last packet */
2936 if (flags & RX_LAST_PACKET) {
2937 call->flags |= RX_CALL_HAVE_LAST;
2941 /* Check whether we have all of the packets for this call */
2942 if (call->flags & RX_CALL_HAVE_LAST) {
2943 afs_uint32 tseq; /* temporary sequence number */
2944 struct rx_packet *tp; /* Temporary packet pointer */
2945 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2947 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2948 if (tseq != tp->header.seq)
2950 if (tp->header.flags & RX_LAST_PACKET) {
2951 call->flags |= RX_CALL_RECEIVE_DONE;
2958 /* Provide asynchronous notification for those who want it
2959 * (e.g. multi rx) */
2960 if (call->arrivalProc) {
2961 (*call->arrivalProc)(call, call->arrivalProcHandle,
2962 call->arrivalProcArg);
2963 call->arrivalProc = (VOID (*)()) 0;
2966 /* Update last packet received */
2969 /* If there is no server process serving this call, grab
2970 * one, if available. We only need to do this once. If a
2971 * server thread is available, this thread becomes a server
2972 * thread and the server thread becomes a listener thread. */
2974 TryAttach(call, socket, tnop, newcallp);
2977 /* This is not the expected next packet. */
2979 /* Determine whether this is a new or old packet, and if it's
2980 * a new one, whether it fits into the current receive window.
2981 * Also figure out whether the packet was delivered in sequence.
2982 * We use the prev variable to determine whether the new packet
2983 * is the successor of its immediate predecessor in the
2984 * receive queue, and the missing flag to determine whether
2985 * any of this packets predecessors are missing. */
2987 afs_uint32 prev; /* "Previous packet" sequence number */
2988 struct rx_packet *tp; /* Temporary packet pointer */
2989 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2990 int missing; /* Are any predecessors missing? */
2992 /* If the new packet's sequence number has been sent to the
2993 * application already, then this is a duplicate */
2994 if (seq < call->rnext) {
2995 MUTEX_ENTER(&rx_stats_mutex);
2996 rx_stats.dupPacketsRead++;
2997 MUTEX_EXIT(&rx_stats_mutex);
2998 rxevent_Cancel(call->delayedAckEvent, call,
2999 RX_CALL_REFCOUNT_DELAY);
3000 np = rxi_SendAck(call, np, seq, serial,
3001 flags, RX_ACK_DUPLICATE, istack);
3007 /* If the sequence number is greater than what can be
3008 * accomodated by the current window, then send a negative
3009 * acknowledge and drop the packet */
3010 if ((call->rnext + call->rwind) <= seq) {
3011 rxevent_Cancel(call->delayedAckEvent, call,
3012 RX_CALL_REFCOUNT_DELAY);
3013 np = rxi_SendAck(call, np, seq, serial,
3014 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3020 /* Look for the packet in the queue of old received packets */
3021 for (prev = call->rnext - 1, missing = 0,
3022 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3023 /*Check for duplicate packet */
3024 if (seq == tp->header.seq) {
3025 MUTEX_ENTER(&rx_stats_mutex);
3026 rx_stats.dupPacketsRead++;
3027 MUTEX_EXIT(&rx_stats_mutex);
3028 rxevent_Cancel(call->delayedAckEvent, call,
3029 RX_CALL_REFCOUNT_DELAY);
3030 np = rxi_SendAck(call, np, seq, serial,
3031 flags, RX_ACK_DUPLICATE, istack);
3036 /* If we find a higher sequence packet, break out and
3037 * insert the new packet here. */
3038 if (seq < tp->header.seq) break;
3039 /* Check for missing packet */
3040 if (tp->header.seq != prev+1) {
3044 prev = tp->header.seq;
3047 /* Keep track of whether we have received the last packet. */
3048 if (flags & RX_LAST_PACKET) {
3049 call->flags |= RX_CALL_HAVE_LAST;
3052 /* It's within the window: add it to the the receive queue.
3053 * tp is left by the previous loop either pointing at the
3054 * packet before which to insert the new packet, or at the
3055 * queue head if the queue is empty or the packet should be
3057 queue_InsertBefore(tp, np);
3061 /* Check whether we have all of the packets for this call */
3062 if ((call->flags & RX_CALL_HAVE_LAST)
3063 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3064 afs_uint32 tseq; /* temporary sequence number */
3066 for (tseq = call->rnext,
3067 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3068 if (tseq != tp->header.seq)
3070 if (tp->header.flags & RX_LAST_PACKET) {
3071 call->flags |= RX_CALL_RECEIVE_DONE;
3078 /* We need to send an ack of the packet is out of sequence,
3079 * or if an ack was requested by the peer. */
3080 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3084 /* Acknowledge the last packet for each call */
3085 if (flags & RX_LAST_PACKET) {
3096 * If the receiver is waiting for an iovec, fill the iovec
3097 * using the data from the receive queue */
3098 if (call->flags & RX_CALL_IOVEC_WAIT) {
3099 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3100 /* the call may have been aborted */
3109 /* Wakeup the reader if any */
3110 if ((call->flags & RX_CALL_READER_WAIT) &&
3111 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3112 (call->iovNext >= call->iovMax) ||
3113 (call->flags & RX_CALL_RECEIVE_DONE))) {
3114 call->flags &= ~RX_CALL_READER_WAIT;
3115 #ifdef RX_ENABLE_LOCKS
3116 CV_BROADCAST(&call->cv_rq);
3118 osi_rxWakeup(&call->rq);
3124 * Send an ack when requested by the peer, or once every
3125 * rxi_SoftAckRate packets until the last packet has been
3126 * received. Always send a soft ack for the last packet in
3127 * the server's reply. */
3129 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3130 np = rxi_SendAck(call, np, seq, serial, flags,
3131 RX_ACK_REQUESTED, istack);
3132 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3133 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3134 np = rxi_SendAck(call, np, seq, serial, flags,
3135 RX_ACK_DELAY, istack);
3136 } else if (call->nSoftAcks) {
3137 clock_GetTime(&when);
3138 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3139 clock_Add(&when, &rx_lastAckDelay);
3141 clock_Add(&when, &rx_softAckDelay);
3143 if (!call->delayedAckEvent ||
3144 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3145 rxevent_Cancel(call->delayedAckEvent, call,
3146 RX_CALL_REFCOUNT_DELAY);
3147 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3148 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3151 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3152 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3159 static void rxi_ComputeRate();
3162 /* The real smarts of the whole thing. */
3163 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3164 register struct rx_call *call;
3165 struct rx_packet *np;
3168 struct rx_ackPacket *ap;
3170 register struct rx_packet *tp;
3171 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3172 register struct rx_connection *conn = call->conn;
3173 struct rx_peer *peer = conn->peer;
3176 /* because there are CM's that are bogus, sending weird values for this. */
3177 afs_uint32 skew = 0;
3178 int needRxStart = 0;
3183 int newAckCount = 0;
3184 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3185 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3187 MUTEX_ENTER(&rx_stats_mutex);
3188 rx_stats.ackPacketsRead++;
3189 MUTEX_EXIT(&rx_stats_mutex);
3190 ap = (struct rx_ackPacket *) rx_DataOf(np);
3191 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3193 return np; /* truncated ack packet */
3195 /* depends on ack packet struct */
3196 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3197 first = ntohl(ap->firstPacket);
3198 serial = ntohl(ap->serial);
3199 /* temporarily disabled -- needs to degrade over time
3200 skew = ntohs(ap->maxSkew); */
3202 /* Ignore ack packets received out of order */
3203 if (first < call->tfirst) {
3207 if (np->header.flags & RX_SLOW_START_OK) {
3208 call->flags |= RX_CALL_SLOW_START_OK;
3214 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3215 ap->reason, ntohl(ap->previousPacket), np->header.seq, serial,
3216 skew, ntohl(ap->firstPacket));
3219 for (offset = 0; offset < nAcks; offset++)
3220 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3226 /* if a server connection has been re-created, it doesn't remember what
3227 serial # it was up to. An ack will tell us, since the serial field
3228 contains the largest serial received by the other side */
3229 MUTEX_ENTER(&conn->conn_data_lock);
3230 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3231 conn->serial = serial+1;
3233 MUTEX_EXIT(&conn->conn_data_lock);
3235 /* Update the outgoing packet skew value to the latest value of
3236 * the peer's incoming packet skew value. The ack packet, of
3237 * course, could arrive out of order, but that won't affect things
3239 MUTEX_ENTER(&peer->peer_lock);
3240 peer->outPacketSkew = skew;
3242 /* Check for packets that no longer need to be transmitted, and
3243 * discard them. This only applies to packets positively
3244 * acknowledged as having been sent to the peer's upper level.
3245 * All other packets must be retained. So only packets with
3246 * sequence numbers < ap->firstPacket are candidates. */
3247 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3248 if (tp->header.seq >= first) break;
3249 call->tfirst = tp->header.seq + 1;
3250 if (tp->header.serial == serial) {
3251 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3253 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3256 else if ((tp->firstSerial == serial)) {
3257 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3259 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3262 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3263 /* XXX Hack. Because we have to release the global rx lock when sending
3264 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3265 * in rxi_Start sending packets out because packets may move to the
3266 * freePacketQueue as result of being here! So we drop these packets until
3267 * we're safely out of the traversing. Really ugly!
3268 * To make it even uglier, if we're using fine grain locking, we can
3269 * set the ack bits in the packets and have rxi_Start remove the packets
3270 * when it's done transmitting.
3275 if (call->flags & RX_CALL_TQ_BUSY) {
3276 #ifdef RX_ENABLE_LOCKS
3278 call->flags |= RX_CALL_TQ_SOME_ACKED;
3279 #else /* RX_ENABLE_LOCKS */
3281 #endif /* RX_ENABLE_LOCKS */
3283 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3286 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3291 /* Give rate detector a chance to respond to ping requests */
3292 if (ap->reason == RX_ACK_PING_RESPONSE) {
3293 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3297 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3299 /* Now go through explicit acks/nacks and record the results in
3300 * the waiting packets. These are packets that can't be released
3301 * yet, even with a positive acknowledge. This positive
3302 * acknowledge only means the packet has been received by the
3303 * peer, not that it will be retained long enough to be sent to
3304 * the peer's upper level. In addition, reset the transmit timers
3305 * of any missing packets (those packets that must be missing
3306 * because this packet was out of sequence) */
3308 call->nSoftAcked = 0;
3309 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3310 /* Update round trip time if the ack was stimulated on receipt
3312 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3313 #ifdef RX_ENABLE_LOCKS
3314 if (tp->header.seq >= first) {
3315 #endif /* RX_ENABLE_LOCKS */
3316 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3317 if (tp->header.serial == serial) {
3318 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3320 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3323 else if ((tp->firstSerial == serial)) {
3324 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3326 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3329 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3330 #ifdef RX_ENABLE_LOCKS
3332 #endif /* RX_ENABLE_LOCKS */
3333 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3335 /* Set the acknowledge flag per packet based on the
3336 * information in the ack packet. An acknowlegded packet can
3337 * be downgraded when the server has discarded a packet it
3338 * soacked previously, or when an ack packet is received
3339 * out of sequence. */
3340 if (tp->header.seq < first) {
3341 /* Implicit ack information */
3347 else if (tp->header.seq < first + nAcks) {
3348 /* Explicit ack information: set it in the packet appropriately */
3349 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3369 /* If packet isn't yet acked, and it has been transmitted at least
3370 * once, reset retransmit time using latest timeout
3371 * ie, this should readjust the retransmit timer for all outstanding
3372 * packets... So we don't just retransmit when we should know better*/
3374 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3375 tp->retryTime = tp->timeSent;
3376 clock_Add(&tp->retryTime, &peer->timeout);
3377 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3378 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3382 /* If the window has been extended by this acknowledge packet,
3383 * then wakeup a sender waiting in alloc for window space, or try
3384 * sending packets now, if he's been sitting on packets due to
3385 * lack of window space */
3386 if (call->tnext < (call->tfirst + call->twind)) {
3387 #ifdef RX_ENABLE_LOCKS
3388 CV_SIGNAL(&call->cv_twind);
3390 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3391 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3392 osi_rxWakeup(&call->twind);
3395 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3396 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3400 /* if the ack packet has a receivelen field hanging off it,
3401 * update our state */
3402 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3405 /* If the ack packet has a "recommended" size that is less than
3406 * what I am using now, reduce my size to match */
3407 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3408 sizeof(afs_int32), &tSize);
3409 tSize = (afs_uint32) ntohl(tSize);
3410 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3412 /* Get the maximum packet size to send to this peer */
3413 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3415 tSize = (afs_uint32)ntohl(tSize);
3416 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3417 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3419 /* sanity check - peer might have restarted with different params.
3420 * If peer says "send less", dammit, send less... Peer should never
3421 * be unable to accept packets of the size that prior AFS versions would
3422 * send without asking. */
3423 if (peer->maxMTU != tSize) {
3424 peer->maxMTU = tSize;
3425 peer->MTU = MIN(tSize, peer->MTU);
3426 call->MTU = MIN(call->MTU, tSize);
3430 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3432 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3433 sizeof(afs_int32), &tSize);
3434 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3435 if (tSize < call->twind) { /* smaller than our send */
3436 call->twind = tSize; /* window, we must send less... */
3437 call->ssthresh = MIN(call->twind, call->ssthresh);
3440 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3441 * network MTU confused with the loopback MTU. Calculate the
3442 * maximum MTU here for use in the slow start code below.
3444 maxMTU = peer->maxMTU;
3445 /* Did peer restart with older RX version? */
3446 if (peer->maxDgramPackets > 1) {
3447 peer->maxDgramPackets = 1;
3449 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3451 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3452 sizeof(afs_int32), &tSize);
3453 tSize = (afs_uint32) ntohl(tSize);
3455 * As of AFS 3.5 we set the send window to match the receive window.
3457 if (tSize < call->twind) {
3458 call->twind = tSize;
3459 call->ssthresh = MIN(call->twind, call->ssthresh);
3460 } else if (tSize > call->twind) {
3461 call->twind = tSize;
3465 * As of AFS 3.5, a jumbogram is more than one fixed size
3466 * packet transmitted in a single UDP datagram. If the remote
3467 * MTU is smaller than our local MTU then never send a datagram
3468 * larger than the natural MTU.
3470 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3471 sizeof(afs_int32), &tSize);
3472 maxDgramPackets = (afs_uint32) ntohl(tSize);
3473 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3474 maxDgramPackets = MIN(maxDgramPackets,
3475 (int)(peer->ifDgramPackets));
3476 maxDgramPackets = MIN(maxDgramPackets, tSize);
3477 if (maxDgramPackets > 1) {
3478 peer->maxDgramPackets = maxDgramPackets;
3479 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3481 peer->maxDgramPackets = 1;
3482 call->MTU = peer->natMTU;
3484 } else if (peer->maxDgramPackets > 1) {
3485 /* Restarted with lower version of RX */
3486 peer->maxDgramPackets = 1;
3488 } else if (peer->maxDgramPackets > 1 ||
3489 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3490 /* Restarted with lower version of RX */
3491 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3492 peer->natMTU = OLD_MAX_PACKET_SIZE;
3493 peer->MTU = OLD_MAX_PACKET_SIZE;
3494 peer->maxDgramPackets = 1;
3495 peer->nDgramPackets = 1;
3497 call->MTU = OLD_MAX_PACKET_SIZE;
3502 * Calculate how many datagrams were successfully received after
3503 * the first missing packet and adjust the negative ack counter
3508 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3509 if (call->nNacks < nNacked) {
3510 call->nNacks = nNacked;
3519 if (call->flags & RX_CALL_FAST_RECOVER) {
3521 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3523 call->flags &= ~RX_CALL_FAST_RECOVER;
3524 call->cwind = call->nextCwind;
3525 call->nextCwind = 0;
3528 call->nCwindAcks = 0;
3530 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3531 /* Three negative acks in a row trigger congestion recovery */
3532 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3533 MUTEX_EXIT(&peer->peer_lock);
3534 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3535 /* someone else is waiting to start recovery */
3538 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3539 while (call->flags & RX_CALL_TQ_BUSY) {
3540 call->flags |= RX_CALL_TQ_WAIT;
3541 #ifdef RX_ENABLE_LOCKS
3542 CV_WAIT(&call->cv_tq, &call->lock);
3543 #else /* RX_ENABLE_LOCKS */
3544 osi_rxSleep(&call->tq);
3545 #endif /* RX_ENABLE_LOCKS */
3547 MUTEX_ENTER(&peer->peer_lock);
3548 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3549 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3550 call->flags |= RX_CALL_FAST_RECOVER;
3551 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3552 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3554 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3555 call->nextCwind = call->ssthresh;
3558 peer->MTU = call->MTU;
3559 peer->cwind = call->nextCwind;
3560 peer->nDgramPackets = call->nDgramPackets;
3562 call->congestSeq = peer->congestSeq;
3563 /* Reset the resend times on the packets that were nacked
3564 * so we will retransmit as soon as the window permits*/
3565 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3568 clock_Zero(&tp->retryTime);
3570 } else if (tp->acked) {
3575 /* If cwind is smaller than ssthresh, then increase
3576 * the window one packet for each ack we receive (exponential
3578 * If cwind is greater than or equal to ssthresh then increase
3579 * the congestion window by one packet for each cwind acks we
3580 * receive (linear growth). */
3581 if (call->cwind < call->ssthresh) {
3582 call->cwind = MIN((int)call->ssthresh,
3583 (int)(call->cwind + newAckCount));
3584 call->nCwindAcks = 0;
3586 call->nCwindAcks += newAckCount;
3587 if (call->nCwindAcks >= call->cwind) {
3588 call->nCwindAcks = 0;
3589 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3593 * If we have received several acknowledgements in a row then
3594 * it is time to increase the size of our datagrams
3596 if ((int)call->nAcks > rx_nDgramThreshold) {
3597 if (peer->maxDgramPackets > 1) {
3598 if (call->nDgramPackets < peer->maxDgramPackets) {
3599 call->nDgramPackets++;
3601 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3602 } else if (call->MTU < peer->maxMTU) {
3603 call->MTU += peer->natMTU;
3604 call->MTU = MIN(call->MTU, peer->maxMTU);
3610 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3612 /* Servers need to hold the call until all response packets have
3613 * been acknowledged. Soft acks are good enough since clients
3614 * are not allowed to clear their receive queues. */
3615 if (call->state == RX_STATE_HOLD &&
3616 call->tfirst + call->nSoftAcked >= call->tnext) {
3617 call->state = RX_STATE_DALLY;
3618 rxi_ClearTransmitQueue(call, 0);
3619 } else if (!queue_IsEmpty(&call->tq)) {
3620 rxi_Start(0, call, istack);
3625 /* Received a response to a challenge packet */
3626 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3627 register struct rx_connection *conn;
3628 register struct rx_packet *np;
3633 /* Ignore the packet if we're the client */
3634 if (conn->type == RX_CLIENT_CONNECTION) return np;
3636 /* If already authenticated, ignore the packet (it's probably a retry) */
3637 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3640 /* Otherwise, have the security object evaluate the response packet */
3641 error = RXS_CheckResponse(conn->securityObject, conn, np);
3643 /* If the response is invalid, reset the connection, sending
3644 * an abort to the peer */
3648 rxi_ConnectionError(conn, error);
3649 MUTEX_ENTER(&conn->conn_data_lock);
3650 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3651 MUTEX_EXIT(&conn->conn_data_lock);
3655 /* If the response is valid, any calls waiting to attach
3656 * servers can now do so */
3658 for (i=0; i<RX_MAXCALLS; i++) {
3659 struct rx_call *call = conn->call[i];
3661 MUTEX_ENTER(&call->lock);
3662 if (call->state == RX_STATE_PRECALL)
3663 rxi_AttachServerProc(call, -1, NULL, NULL);
3664 MUTEX_EXIT(&call->lock);
3671 /* A client has received an authentication challenge: the security
3672 * object is asked to cough up a respectable response packet to send
3673 * back to the server. The server is responsible for retrying the
3674 * challenge if it fails to get a response. */
3677 rxi_ReceiveChallengePacket(conn, np, istack)
3678 register struct rx_connection *conn;
3679 register struct rx_packet *np;
3684 /* Ignore the challenge if we're the server */
3685 if (conn->type == RX_SERVER_CONNECTION) return np;
3687 /* Ignore the challenge if the connection is otherwise idle; someone's
3688 * trying to use us as an oracle. */
3689 if (!rxi_HasActiveCalls(conn)) return np;
3691 /* Send the security object the challenge packet. It is expected to fill
3692 * in the response. */
3693 error = RXS_GetResponse(conn->securityObject, conn, np);
3695 /* If the security object is unable to return a valid response, reset the
3696 * connection and send an abort to the peer. Otherwise send the response
3697 * packet to the peer connection. */
3699 rxi_ConnectionError(conn, error);
3700 MUTEX_ENTER(&conn->conn_data_lock);
3701 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3702 MUTEX_EXIT(&conn->conn_data_lock);
3705 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3706 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3712 /* Find an available server process to service the current request in
3713 * the given call structure. If one isn't available, queue up this
3714 * call so it eventually gets one */
3716 rxi_AttachServerProc(call, socket, tnop, newcallp)
3717 register struct rx_call *call;
3718 register osi_socket socket;
3720 register struct rx_call **newcallp;
3722 register struct rx_serverQueueEntry *sq;
3723 register struct rx_service *service = call->conn->service;
3724 #ifdef RX_ENABLE_LOCKS
3725 register int haveQuota = 0;
3726 #endif /* RX_ENABLE_LOCKS */
3727 /* May already be attached */
3728 if (call->state == RX_STATE_ACTIVE) return;
3730 MUTEX_ENTER(&rx_serverPool_lock);
3731 #ifdef RX_ENABLE_LOCKS
3732 while(rxi_ServerThreadSelectingCall) {
3733 MUTEX_EXIT(&call->lock);
3734 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3735 MUTEX_EXIT(&rx_serverPool_lock);
3736 MUTEX_ENTER(&call->lock);
3737 MUTEX_ENTER(&rx_serverPool_lock);
3738 /* Call may have been attached */
3739 if (call->state == RX_STATE_ACTIVE) return;
3742 haveQuota = QuotaOK(service);
3743 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3744 /* If there are no processes available to service this call,
3745 * put the call on the incoming call queue (unless it's
3746 * already on the queue).
3749 ReturnToServerPool(service);
3750 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3751 call->flags |= RX_CALL_WAIT_PROC;
3752 MUTEX_ENTER(&rx_stats_mutex);
3754 MUTEX_EXIT(&rx_stats_mutex);
3755 rxi_calltrace(RX_CALL_ARRIVAL, call);
3756 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3757 queue_Append(&rx_incomingCallQueue, call);
3760 #else /* RX_ENABLE_LOCKS */
3761 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3762 /* If there are no processes available to service this call,
3763 * put the call on the incoming call queue (unless it's
3764 * already on the queue).
3766 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3767 call->flags |= RX_CALL_WAIT_PROC;
3769 rxi_calltrace(RX_CALL_ARRIVAL, call);
3770 queue_Append(&rx_incomingCallQueue, call);
3773 #endif /* RX_ENABLE_LOCKS */
3775 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3777 /* If hot threads are enabled, and both newcallp and sq->socketp
3778 * are non-null, then this thread will process the call, and the
3779 * idle server thread will start listening on this threads socket.
3782 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3785 *sq->socketp = socket;
3786 clock_GetTime(&call->startTime);
3787 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3791 if (call->flags & RX_CALL_WAIT_PROC) {
3792 /* Conservative: I don't think this should happen */
3793 call->flags &= ~RX_CALL_WAIT_PROC;
3794 MUTEX_ENTER(&rx_stats_mutex);
3796 MUTEX_EXIT(&rx_stats_mutex);
3799 call->state = RX_STATE_ACTIVE;
3800 call->mode = RX_MODE_RECEIVING;
3801 if (call->flags & RX_CALL_CLEARED) {
3802 /* send an ack now to start the packet flow up again */
3803 call->flags &= ~RX_CALL_CLEARED;
3804 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3806 #ifdef RX_ENABLE_LOCKS
3809 service->nRequestsRunning++;
3810 if (service->nRequestsRunning <= service->minProcs)
3816 MUTEX_EXIT(&rx_serverPool_lock);
3819 /* Delay the sending of an acknowledge event for a short while, while
3820 * a new call is being prepared (in the case of a client) or a reply
3821 * is being prepared (in the case of a server). Rather than sending
3822 * an ack packet, an ACKALL packet is sent. */
3823 void rxi_AckAll(event, call, dummy)
3824 struct rxevent *event;
3825 register struct rx_call *call;
3828 #ifdef RX_ENABLE_LOCKS
3830 MUTEX_ENTER(&call->lock);
3831 call->delayedAckEvent = (struct rxevent *) 0;
3832 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3834 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3835 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3837 MUTEX_EXIT(&call->lock);
3838 #else /* RX_ENABLE_LOCKS */
3839 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3840 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3841 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3842 #endif /* RX_ENABLE_LOCKS */
3845 void rxi_SendDelayedAck(event, call, dummy)
3846 struct rxevent *event;
3847 register struct rx_call *call;
3850 #ifdef RX_ENABLE_LOCKS
3852 MUTEX_ENTER(&call->lock);
3853 if (event == call->delayedAckEvent)
3854 call->delayedAckEvent = (struct rxevent *) 0;
3855 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3857 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3859 MUTEX_EXIT(&call->lock);
3860 #else /* RX_ENABLE_LOCKS */
3861 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3862 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3863 #endif /* RX_ENABLE_LOCKS */
3867 #ifdef RX_ENABLE_LOCKS
3868 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3869 * clearing them out.
3871 static void rxi_SetAcksInTransmitQueue(call)
3872 register struct rx_call *call;
3874 register struct rx_packet *p, *tp;
3877 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3884 call->flags |= RX_CALL_TQ_CLEARME;
3885 call->flags |= RX_CALL_TQ_SOME_ACKED;
3888 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3889 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3890 call->tfirst = call->tnext;
3891 call->nSoftAcked = 0;
3893 if (call->flags & RX_CALL_FAST_RECOVER) {
3894 call->flags &= ~RX_CALL_FAST_RECOVER;
3895 call->cwind = call->nextCwind;
3896 call->nextCwind = 0;
3899 CV_SIGNAL(&call->cv_twind);
3901 #endif /* RX_ENABLE_LOCKS */
3903 /* Clear out the transmit queue for the current call (all packets have
3904 * been received by peer) */
3905 void rxi_ClearTransmitQueue(call, force)
3906 register struct rx_call *call;
3909 register struct rx_packet *p, *tp;
3911 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3912 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3914 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3921 call->flags |= RX_CALL_TQ_CLEARME;
3922 call->flags |= RX_CALL_TQ_SOME_ACKED;
3925 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3926 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3932 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3933 call->flags &= ~RX_CALL_TQ_CLEARME;
3935 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3937 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3938 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3939 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3940 call->nSoftAcked = 0;
3942 if (call->flags & RX_CALL_FAST_RECOVER) {
3943 call->flags &= ~RX_CALL_FAST_RECOVER;
3944 call->cwind = call->nextCwind;
3947 #ifdef RX_ENABLE_LOCKS
3948 CV_SIGNAL(&call->cv_twind);
3950 osi_rxWakeup(&call->twind);
3954 void rxi_ClearReceiveQueue(call)
3955 register struct rx_call *call;
3957 register struct rx_packet *p, *tp;
3958 if (queue_IsNotEmpty(&call->rq)) {
3959 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
3964 rx_packetReclaims++;
3966 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
3968 if (call->state == RX_STATE_PRECALL) {
3969 call->flags |= RX_CALL_CLEARED;
3973 /* Send an abort packet for the specified call */
3974 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
3975 register struct rx_call *call;
3976 struct rx_packet *packet;
3986 /* Clients should never delay abort messages */
3987 if (rx_IsClientConn(call->conn))
3990 if (call->abortCode != call->error) {
3991 call->abortCode = call->error;
3992 call->abortCount = 0;
3995 if (force || rxi_callAbortThreshhold == 0 ||
3996 call->abortCount < rxi_callAbortThreshhold) {
3997 if (call->delayedAbortEvent) {
3998 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4000 error = htonl(call->error);
4002 packet = rxi_SendSpecial(call, call->conn, packet,
4003 RX_PACKET_TYPE_ABORT, (char *)&error,
4004 sizeof(error), istack);
4005 } else if (!call->delayedAbortEvent) {
4006 clock_GetTime(&when);
4007 clock_Addmsec(&when, rxi_callAbortDelay);
4008 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4009 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4015 /* Send an abort packet for the specified connection. Packet is an
4016 * optional pointer to a packet that can be used to send the abort.
4017 * Once the number of abort messages reaches the threshhold, an
4018 * event is scheduled to send the abort. Setting the force flag
4019 * overrides sending delayed abort messages.
4021 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4022 * to send the abort packet.
4024 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4025 register struct rx_connection *conn;
4026 struct rx_packet *packet;
4036 /* Clients should never delay abort messages */
4037 if (rx_IsClientConn(conn))
4040 if (force || rxi_connAbortThreshhold == 0 ||
4041 conn->abortCount < rxi_connAbortThreshhold) {
4042 if (conn->delayedAbortEvent) {
4043 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4045 error = htonl(conn->error);
4047 MUTEX_EXIT(&conn->conn_data_lock);
4048 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4049 RX_PACKET_TYPE_ABORT, (char *)&error,
4050 sizeof(error), istack);
4051 MUTEX_ENTER(&conn->conn_data_lock);
4052 } else if (!conn->delayedAbortEvent) {
4053 clock_GetTime(&when);
4054 clock_Addmsec(&when, rxi_connAbortDelay);
4055 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4061 /* Associate an error all of the calls owned by a connection. Called
4062 * with error non-zero. This is only for really fatal things, like
4063 * bad authentication responses. The connection itself is set in
4064 * error at this point, so that future packets received will be
4066 void rxi_ConnectionError(conn, error)
4067 register struct rx_connection *conn;
4068 register afs_int32 error;
4072 if (conn->challengeEvent)
4073 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4074 for (i=0; i<RX_MAXCALLS; i++) {
4075 struct rx_call *call = conn->call[i];
4077 MUTEX_ENTER(&call->lock);
4078 rxi_CallError(call, error);
4079 MUTEX_EXIT(&call->lock);
4082 conn->error = error;
4083 MUTEX_ENTER(&rx_stats_mutex);
4084 rx_stats.fatalErrors++;
4085 MUTEX_EXIT(&rx_stats_mutex);
4089 void rxi_CallError(call, error)
4090 register struct rx_call *call;
4093 if (call->error) error = call->error;
4094 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4095 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4096 rxi_ResetCall(call, 0);
4099 rxi_ResetCall(call, 0);
4101 call->error = error;
4102 call->mode = RX_MODE_ERROR;
4105 /* Reset various fields in a call structure, and wakeup waiting
4106 * processes. Some fields aren't changed: state & mode are not
4107 * touched (these must be set by the caller), and bufptr, nLeft, and
4108 * nFree are not reset, since these fields are manipulated by
4109 * unprotected macros, and may only be reset by non-interrupting code.
4112 /* this code requires that call->conn be set properly as a pre-condition. */
4113 #endif /* ADAPT_WINDOW */
4115 void rxi_ResetCall(call, newcall)
4116 register struct rx_call *call;
4117 register int newcall;
4120 register struct rx_peer *peer;
4121 struct rx_packet *packet;
4123 /* Notify anyone who is waiting for asynchronous packet arrival */
4124 if (call->arrivalProc) {
4125 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4126 call->arrivalProc = (VOID (*)()) 0;
4129 if (call->delayedAbortEvent) {
4130 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4131 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4133 rxi_SendCallAbort(call, packet, 0, 1);
4134 rxi_FreePacket(packet);
4139 * Update the peer with the congestion information in this call
4140 * so other calls on this connection can pick up where this call
4141 * left off. If the congestion sequence numbers don't match then
4142 * another call experienced a retransmission.
4144 peer = call->conn->peer;
4145 MUTEX_ENTER(&peer->peer_lock);
4147 if (call->congestSeq == peer->congestSeq) {
4148 peer->cwind = MAX(peer->cwind, call->cwind);
4149 peer->MTU = MAX(peer->MTU, call->MTU);
4150 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4153 call->abortCode = 0;
4154 call->abortCount = 0;
4156 if (peer->maxDgramPackets > 1) {
4157 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4159 call->MTU = peer->MTU;
4161 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4162 call->ssthresh = rx_maxSendWindow;
4163 call->nDgramPackets = peer->nDgramPackets;
4164 call->congestSeq = peer->congestSeq;
4165 MUTEX_EXIT(&peer->peer_lock);
4167 flags = call->flags;
4168 rxi_ClearReceiveQueue(call);
4169 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4170 if (call->flags & RX_CALL_TQ_BUSY) {
4171 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4172 call->flags |= (flags & RX_CALL_TQ_WAIT);
4174 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4176 rxi_ClearTransmitQueue(call, 0);
4177 queue_Init(&call->tq);
4180 queue_Init(&call->rq);
4182 call->rwind = rx_initReceiveWindow;
4183 call->twind = rx_initSendWindow;
4184 call->nSoftAcked = 0;
4185 call->nextCwind = 0;
4188 call->nCwindAcks = 0;
4189 call->nSoftAcks = 0;
4190 call->nHardAcks = 0;
4192 call->tfirst = call->rnext = call->tnext = 1;
4194 call->lastAcked = 0;
4195 call->localStatus = call->remoteStatus = 0;
4197 if (flags & RX_CALL_READER_WAIT) {
4198 #ifdef RX_ENABLE_LOCKS
4199 CV_BROADCAST(&call->cv_rq);
4201 osi_rxWakeup(&call->rq);
4204 if (flags & RX_CALL_WAIT_PACKETS) {
4205 MUTEX_ENTER(&rx_freePktQ_lock);
4206 rxi_PacketsUnWait(); /* XXX */
4207 MUTEX_EXIT(&rx_freePktQ_lock);
4210 #ifdef RX_ENABLE_LOCKS
4211 CV_SIGNAL(&call->cv_twind);
4213 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4214 osi_rxWakeup(&call->twind);
4217 #ifdef RX_ENABLE_LOCKS
4218 /* The following ensures that we don't mess with any queue while some
4219 * other thread might also be doing so. The call_queue_lock field is
4220 * is only modified under the call lock. If the call is in the process
4221 * of being removed from a queue, the call is not locked until the
4222 * the queue lock is dropped and only then is the call_queue_lock field
4223 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4224 * Note that any other routine which removes a call from a queue has to
4225 * obtain the queue lock before examing the queue and removing the call.
4227 if (call->call_queue_lock) {
4228 MUTEX_ENTER(call->call_queue_lock);
4229 if (queue_IsOnQueue(call)) {
4231 if (flags & RX_CALL_WAIT_PROC) {
4232 MUTEX_ENTER(&rx_stats_mutex);
4234 MUTEX_EXIT(&rx_stats_mutex);
4237 MUTEX_EXIT(call->call_queue_lock);
4238 CLEAR_CALL_QUEUE_LOCK(call);
4240 #else /* RX_ENABLE_LOCKS */
4241 if (queue_IsOnQueue(call)) {
4243 if (flags & RX_CALL_WAIT_PROC)
4246 #endif /* RX_ENABLE_LOCKS */
4248 rxi_KeepAliveOff(call);
4249 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4252 /* Send an acknowledge for the indicated packet (seq,serial) of the
4253 * indicated call, for the indicated reason (reason). This
4254 * acknowledge will specifically acknowledge receiving the packet, and
4255 * will also specify which other packets for this call have been
4256 * received. This routine returns the packet that was used to the
4257 * caller. The caller is responsible for freeing it or re-using it.
4258 * This acknowledgement also returns the highest sequence number
4259 * actually read out by the higher level to the sender; the sender
4260 * promises to keep around packets that have not been read by the
4261 * higher level yet (unless, of course, the sender decides to abort
4262 * the call altogether). Any of p, seq, serial, pflags, or reason may
4263 * be set to zero without ill effect. That is, if they are zero, they
4264 * will not convey any information.
4265 * NOW there is a trailer field, after the ack where it will safely be
4266 * ignored by mundanes, which indicates the maximum size packet this
4267 * host can swallow. */
4268 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4269 register struct rx_call *call;
4270 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4271 int seq; /* Sequence number of the packet we are acking */
4272 int serial; /* Serial number of the packet */
4273 int pflags; /* Flags field from packet header */
4274 int reason; /* Reason an acknowledge was prompted */
4277 struct rx_ackPacket *ap;
4278 register struct rx_packet *rqp;
4279 register struct rx_packet *nxp; /* For queue_Scan */
4280 register struct rx_packet *p;
4285 * Open the receive window once a thread starts reading packets
4287 if (call->rnext > 1) {
4288 call->rwind = rx_maxReceiveWindow;
4291 call->nHardAcks = 0;
4292 call->nSoftAcks = 0;
4293 if (call->rnext > call->lastAcked)
4294 call->lastAcked = call->rnext;
4298 rx_computelen(p, p->length); /* reset length, you never know */
4299 } /* where that's been... */
4301 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4302 /* We won't send the ack, but don't panic. */
4303 return optionalPacket;
4306 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4308 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4309 if (!optionalPacket) rxi_FreePacket(p);
4310 return optionalPacket;
4312 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4313 if (rx_Contiguous(p)<templ) {
4314 if (!optionalPacket) rxi_FreePacket(p);
4315 return optionalPacket;
4317 } /* MTUXXX failing to send an ack is very serious. We should */
4318 /* try as hard as possible to send even a partial ack; it's */
4319 /* better than nothing. */
4321 ap = (struct rx_ackPacket *) rx_DataOf(p);
4322 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4323 ap->reason = reason;
4325 /* The skew computation used to be bogus, I think it's better now. */
4326 /* We should start paying attention to skew. XXX */
4327 ap->serial = htonl(call->conn->maxSerial);
4328 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4330 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4331 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4333 /* No fear of running out of ack packet here because there can only be at most
4334 * one window full of unacknowledged packets. The window size must be constrained
4335 * to be less than the maximum ack size, of course. Also, an ack should always
4336 * fit into a single packet -- it should not ever be fragmented. */
4337 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4338 if (!rqp || !call->rq.next
4339 || (rqp->header.seq > (call->rnext + call->rwind))) {
4340 if (!optionalPacket) rxi_FreePacket(p);
4341 rxi_CallError(call, RX_CALL_DEAD);
4342 return optionalPacket;
4345 while (rqp->header.seq > call->rnext + offset)
4346 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4347 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4349 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4350 if (!optionalPacket) rxi_FreePacket(p);
4351 rxi_CallError(call, RX_CALL_DEAD);
4352 return optionalPacket;
4357 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4359 /* these are new for AFS 3.3 */
4360 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4361 templ = htonl(templ);
4362 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4363 templ = htonl(call->conn->peer->ifMTU);
4364 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4366 /* new for AFS 3.4 */
4367 templ = htonl(call->rwind);
4368 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4370 /* new for AFS 3.5 */
4371 templ = htonl(call->conn->peer->ifDgramPackets);
4372 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4374 p->header.serviceId = call->conn->serviceId;
4375 p->header.cid = (call->conn->cid | call->channel);
4376 p->header.callNumber = *call->callNumber;
4377 p->header.seq = seq;
4378 p->header.securityIndex = call->conn->securityIndex;
4379 p->header.epoch = call->conn->epoch;
4380 p->header.type = RX_PACKET_TYPE_ACK;
4381 p->header.flags = RX_SLOW_START_OK;
4382 if (reason == RX_ACK_PING) {
4383 p->header.flags |= RX_REQUEST_ACK;
4385 clock_GetTime(&call->pingRequestTime);
4388 if (call->conn->type == RX_CLIENT_CONNECTION)
4389 p->header.flags |= RX_CLIENT_INITIATED;
4393 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4394 ap->reason, ntohl(ap->previousPacket), p->header.seq,
4395 ntohl(ap->firstPacket));
4397 for (offset = 0; offset < ap->nAcks; offset++)
4398 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4405 register int i, nbytes = p->length;
4407 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4408 if (nbytes <= p->wirevec[i].iov_len) {
4409 register int savelen, saven;
4411 savelen = p->wirevec[i].iov_len;
4413 p->wirevec[i].iov_len = nbytes;
4415 rxi_Send(call, p, istack);
4416 p->wirevec[i].iov_len = savelen;
4420 else nbytes -= p->wirevec[i].iov_len;
4423 MUTEX_ENTER(&rx_stats_mutex);
4424 rx_stats.ackPacketsSent++;
4425 MUTEX_EXIT(&rx_stats_mutex);
4426 if (!optionalPacket) rxi_FreePacket(p);
4427 return optionalPacket; /* Return packet for re-use by caller */
4430 /* Send all of the packets in the list in single datagram */
4431 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime)
4432 struct rx_call *call;
4433 struct rx_packet **list;
4438 struct clock *retryTime;
4443 struct rx_connection *conn = call->conn;
4444 struct rx_peer *peer = conn->peer;
4446 MUTEX_ENTER(&peer->peer_lock);
4448 MUTEX_ENTER(&rx_stats_mutex);
4449 rx_stats.dataPacketsSent += len;
4450 MUTEX_EXIT(&rx_stats_mutex);
4451 MUTEX_EXIT(&peer->peer_lock);
4453 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4457 /* Set the packet flags and schedule the resend events */
4458 /* Only request an ack for the last packet in the list */
4459 for (i = 0 ; i < len ; i++) {
4460 list[i]->retryTime = *retryTime;
4461 if (list[i]->header.serial) {
4462 /* Exponentially backoff retry times */
4463 if (list[i]->backoff < MAXBACKOFF) {
4464 /* so it can't stay == 0 */
4465 list[i]->backoff = (list[i]->backoff << 1) +1;
4467 else list[i]->backoff++;
4468 clock_Addmsec(&(list[i]->retryTime),
4469 ((afs_uint32) list[i]->backoff) << 8);
4472 /* Wait a little extra for the ack on the last packet */
4473 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4474 clock_Addmsec(&(list[i]->retryTime), 400);
4477 /* Record the time sent */
4478 list[i]->timeSent = *now;
4480 /* Ask for an ack on retransmitted packets, on every other packet
4481 * if the peer doesn't support slow start. Ask for an ack on every
4482 * packet until the congestion window reaches the ack rate. */
4483 if (list[i]->header.serial) {
4485 MUTEX_ENTER(&rx_stats_mutex);
4486 rx_stats.dataPacketsReSent++;
4487 MUTEX_EXIT(&rx_stats_mutex);
4489 /* improved RTO calculation- not Karn */
4490 list[i]->firstSent = *now;
4492 && (call->cwind <= (u_short)(conn->ackRate+1)
4493 || (!(call->flags & RX_CALL_SLOW_START_OK)
4494 && (list[i]->header.seq & 1)))) {
4499 MUTEX_ENTER(&peer->peer_lock);
4501 MUTEX_ENTER(&rx_stats_mutex);
4502 rx_stats.dataPacketsSent++;
4503 MUTEX_EXIT(&rx_stats_mutex);
4504 MUTEX_EXIT(&peer->peer_lock);
4506 /* Tag this packet as not being the last in this group,
4507 * for the receiver's benefit */
4508 if (i < len-1 || moreFlag) {
4509 list[i]->header.flags |= RX_MORE_PACKETS;
4512 /* Install the new retransmit time for the packet, and
4513 * record the time sent */
4514 list[i]->timeSent = *now;
4518 list[len-1]->header.flags |= RX_REQUEST_ACK;
4521 /* Since we're about to send a data packet to the peer, it's
4522 * safe to nuke any scheduled end-of-packets ack */
4523 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4525 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4526 MUTEX_EXIT(&call->lock);
4528 rxi_SendPacketList(conn, list, len, istack);
4530 rxi_SendPacket(conn, list[0], istack);
4532 MUTEX_ENTER(&call->lock);
4533 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4535 /* Update last send time for this call (for keep-alive
4536 * processing), and for the connection (so that we can discover
4537 * idle connections) */
4538 conn->lastSendTime = call->lastSendTime = clock_Sec();
4541 /* When sending packets we need to follow these rules:
4542 * 1. Never send more than maxDgramPackets in a jumbogram.
4543 * 2. Never send a packet with more than two iovecs in a jumbogram.
4544 * 3. Never send a retransmitted packet in a jumbogram.
4545 * 4. Never send more than cwind/4 packets in a jumbogram
4546 * We always keep the last list we should have sent so we
4547 * can set the RX_MORE_PACKETS flags correctly.
4549 static void rxi_SendXmitList(call, list, len, istack, now, retryTime)
4550 struct rx_call *call;
4551 struct rx_packet **list;
4555 struct clock *retryTime;
4557 int i, cnt, lastCnt = 0;
4558 struct rx_packet **listP, **lastP = 0;
4559 struct rx_peer *peer = call->conn->peer;
4560 int morePackets = 0;
4562 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4563 /* Does the current packet force us to flush the current list? */
4565 && (list[i]->header.serial
4567 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4569 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime);
4570 /* If the call enters an error state stop sending, or if
4571 * we entered congestion recovery mode, stop sending */
4572 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4580 /* Add the current packet to the list if it hasn't been acked.
4581 * Otherwise adjust the list pointer to skip the current packet. */
4582 if (!list[i]->acked) {
4584 /* Do we need to flush the list? */
4585 if (cnt >= (int)peer->maxDgramPackets
4586 || cnt >= (int)call->nDgramPackets
4587 || cnt >= (int)call->cwind
4588 || list[i]->header.serial
4589 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4591 rxi_SendList(call, lastP, lastCnt, istack, 1,
4593 /* If the call enters an error state stop sending, or if
4594 * we entered congestion recovery mode, stop sending */
4595 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4605 osi_Panic("rxi_SendList error");
4611 /* Send the whole list when the call is in receive mode, when
4612 * the call is in eof mode, when we are in fast recovery mode,
4613 * and when we have the last packet */
4614 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4615 || call->mode == RX_MODE_RECEIVING
4616 || call->mode == RX_MODE_EOF
4617 || (call->flags & RX_CALL_FAST_RECOVER)) {
4618 /* Check for the case where the current list contains
4619 * an acked packet. Since we always send retransmissions
4620 * in a separate packet, we only need to check the first
4621 * packet in the list */
4622 if (cnt > 0 && !listP[0]->acked) {
4626 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4628 /* If the call enters an error state stop sending, or if
4629 * we entered congestion recovery mode, stop sending */
4630 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4634 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime);
4636 } else if (lastCnt > 0) {
4637 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime);
4641 #ifdef RX_ENABLE_LOCKS
4642 /* Call rxi_Start, below, but with the call lock held. */
4643 void rxi_StartUnlocked(event, call, istack)
4644 struct rxevent *event;
4645 register struct rx_call *call;
4648 MUTEX_ENTER(&call->lock);
4649 rxi_Start(event, call, istack);
4650 MUTEX_EXIT(&call->lock);
4652 #endif /* RX_ENABLE_LOCKS */
4654 /* This routine is called when new packets are readied for
4655 * transmission and when retransmission may be necessary, or when the
4656 * transmission window or burst count are favourable. This should be
4657 * better optimized for new packets, the usual case, now that we've
4658 * got rid of queues of send packets. XXXXXXXXXXX */
4659 void rxi_Start(event, call, istack)
4660 struct rxevent *event;
4661 register struct rx_call *call;
4664 struct rx_packet *p;
4665 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4666 struct rx_peer *peer = call->conn->peer;
4667 struct clock now, retryTime;
4671 struct rx_packet **xmitList;
4673 /* If rxi_Start is being called as a result of a resend event,
4674 * then make sure that the event pointer is removed from the call
4675 * structure, since there is no longer a per-call retransmission
4677 if (event && event == call->resendEvent) {
4678 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4679 call->resendEvent = NULL;
4680 if (queue_IsEmpty(&call->tq)) {
4684 /* Timeouts trigger congestion recovery */
4685 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4686 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4687 /* someone else is waiting to start recovery */
4690 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4691 while (call->flags & RX_CALL_TQ_BUSY) {
4692 call->flags |= RX_CALL_TQ_WAIT;
4693 #ifdef RX_ENABLE_LOCKS
4694 CV_WAIT(&call->cv_tq, &call->lock);
4695 #else /* RX_ENABLE_LOCKS */
4696 osi_rxSleep(&call->tq);
4697 #endif /* RX_ENABLE_LOCKS */
4699 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4700 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4701 call->flags |= RX_CALL_FAST_RECOVER;
4702 if (peer->maxDgramPackets > 1) {
4703 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4705 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4707 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4708 call->nDgramPackets = 1;
4710 call->nextCwind = 1;
4713 MUTEX_ENTER(&peer->peer_lock);
4714 peer->MTU = call->MTU;
4715 peer->cwind = call->cwind;
4716 peer->nDgramPackets = 1;
4718 call->congestSeq = peer->congestSeq;
4719 MUTEX_EXIT(&peer->peer_lock);
4720 /* Clear retry times on packets. Otherwise, it's possible for
4721 * some packets in the queue to force resends at rates faster
4722 * than recovery rates.
4724 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4726 clock_Zero(&p->retryTime);
4731 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4732 MUTEX_ENTER(&rx_stats_mutex);
4733 rx_tq_debug.rxi_start_in_error ++;
4734 MUTEX_EXIT(&rx_stats_mutex);
4739 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4740 /* Get clock to compute the re-transmit time for any packets
4741 * in this burst. Note, if we back off, it's reasonable to
4742 * back off all of the packets in the same manner, even if
4743 * some of them have been retransmitted more times than more
4744 * recent additions */
4745 clock_GetTime(&now);
4746 retryTime = now; /* initialize before use */
4747 MUTEX_ENTER(&peer->peer_lock);
4748 clock_Add(&retryTime, &peer->timeout);
4749 MUTEX_EXIT(&peer->peer_lock);
4751 /* Send (or resend) any packets that need it, subject to
4752 * window restrictions and congestion burst control
4753 * restrictions. Ask for an ack on the last packet sent in
4754 * this burst. For now, we're relying upon the window being
4755 * considerably bigger than the largest number of packets that
4756 * are typically sent at once by one initial call to
4757 * rxi_Start. This is probably bogus (perhaps we should ask
4758 * for an ack when we're half way through the current
4759 * window?). Also, for non file transfer applications, this
4760 * may end up asking for an ack for every packet. Bogus. XXXX
4763 * But check whether we're here recursively, and let the other guy
4766 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4767 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4768 call->flags |= RX_CALL_TQ_BUSY;
4770 call->flags &= ~RX_CALL_NEED_START;
4771 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4773 maxXmitPackets = MIN(call->twind, call->cwind);
4774 xmitList = (struct rx_packet **)
4775 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4776 if (xmitList == NULL)
4777 osi_Panic("rxi_Start, failed to allocate xmit list");
4778 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4779 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4780 /* We shouldn't be sending packets if a thread is waiting
4781 * to initiate congestion recovery */
4784 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4785 /* Only send one packet during fast recovery */
4788 if ((p->header.flags == RX_FREE_PACKET) ||
4789 (!queue_IsEnd(&call->tq, nxp)
4790 && (nxp->header.flags == RX_FREE_PACKET)) ||
4791 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4792 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4793 osi_Panic("rxi_Start: xmit queue clobbered");
4796 MUTEX_ENTER(&rx_stats_mutex);
4797 rx_stats.ignoreAckedPacket++;
4798 MUTEX_EXIT(&rx_stats_mutex);
4799 continue; /* Ignore this packet if it has been acknowledged */
4802 /* Turn off all flags except these ones, which are the same
4803 * on each transmission */
4804 p->header.flags &= RX_PRESET_FLAGS;
4806 if (p->header.seq >= call->tfirst +
4807 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4808 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4809 /* Note: if we're waiting for more window space, we can
4810 * still send retransmits; hence we don't return here, but
4811 * break out to schedule a retransmit event */
4812 dpf(("call %d waiting for window", *(call->callNumber)));
4816 /* Transmit the packet if it needs to be sent. */
4817 if (!clock_Lt(&now, &p->retryTime)) {
4818 if (nXmitPackets == maxXmitPackets) {
4819 osi_Panic("rxi_Start: xmit list overflowed");
4821 xmitList[nXmitPackets++] = p;
4825 /* xmitList now hold pointers to all of the packets that are
4826 * ready to send. Now we loop to send the packets */
4827 if (nXmitPackets > 0) {
4828 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4831 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4833 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4835 * TQ references no longer protected by this flag; they must remain
4836 * protected by the global lock.
4838 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4839 call->flags &= ~RX_CALL_TQ_BUSY;
4840 if (call->flags & RX_CALL_TQ_WAIT) {
4841 call->flags &= ~RX_CALL_TQ_WAIT;
4842 #ifdef RX_ENABLE_LOCKS
4843 CV_BROADCAST(&call->cv_tq);
4844 #else /* RX_ENABLE_LOCKS */
4845 osi_rxWakeup(&call->tq);
4846 #endif /* RX_ENABLE_LOCKS */
4851 /* We went into the error state while sending packets. Now is
4852 * the time to reset the call. This will also inform the using
4853 * process that the call is in an error state.
4855 MUTEX_ENTER(&rx_stats_mutex);
4856 rx_tq_debug.rxi_start_aborted ++;
4857 MUTEX_EXIT(&rx_stats_mutex);
4858 call->flags &= ~RX_CALL_TQ_BUSY;
4859 if (call->flags & RX_CALL_TQ_WAIT) {
4860 call->flags &= ~RX_CALL_TQ_WAIT;
4861 #ifdef RX_ENABLE_LOCKS
4862 CV_BROADCAST(&call->cv_tq);
4863 #else /* RX_ENABLE_LOCKS */
4864 osi_rxWakeup(&call->tq);
4865 #endif /* RX_ENABLE_LOCKS */
4867 rxi_CallError(call, call->error);
4870 #ifdef RX_ENABLE_LOCKS
4871 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4872 register int missing;
4873 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4874 /* Some packets have received acks. If they all have, we can clear
4875 * the transmit queue.
4877 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4878 if (p->header.seq < call->tfirst && p->acked) {
4886 call->flags |= RX_CALL_TQ_CLEARME;
4888 #endif /* RX_ENABLE_LOCKS */
4889 /* Don't bother doing retransmits if the TQ is cleared. */
4890 if (call->flags & RX_CALL_TQ_CLEARME) {
4891 rxi_ClearTransmitQueue(call, 1);
4893 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4896 /* Always post a resend event, if there is anything in the
4897 * queue, and resend is possible. There should be at least
4898 * one unacknowledged packet in the queue ... otherwise none
4899 * of these packets should be on the queue in the first place.
4901 if (call->resendEvent) {
4902 /* Cancel the existing event and post a new one */
4903 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4906 /* The retry time is the retry time on the first unacknowledged
4907 * packet inside the current window */
4908 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4909 /* Don't set timers for packets outside the window */
4910 if (p->header.seq >= call->tfirst + call->twind) {
4914 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4916 retryTime = p->retryTime;
4921 /* Post a new event to re-run rxi_Start when retries may be needed */
4922 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4923 #ifdef RX_ENABLE_LOCKS
4924 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4925 call->resendEvent = rxevent_Post(&retryTime,
4927 (char *)call, istack);
4928 #else /* RX_ENABLE_LOCKS */
4929 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4930 (char *)call, (void*)(long)istack);
4931 #endif /* RX_ENABLE_LOCKS */
4934 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4935 } while (call->flags & RX_CALL_NEED_START);
4937 * TQ references no longer protected by this flag; they must remain
4938 * protected by the global lock.
4940 call->flags &= ~RX_CALL_TQ_BUSY;
4941 if (call->flags & RX_CALL_TQ_WAIT) {
4942 call->flags &= ~RX_CALL_TQ_WAIT;
4943 #ifdef RX_ENABLE_LOCKS
4944 CV_BROADCAST(&call->cv_tq);
4945 #else /* RX_ENABLE_LOCKS */
4946 osi_rxWakeup(&call->tq);
4947 #endif /* RX_ENABLE_LOCKS */
4950 call->flags |= RX_CALL_NEED_START;
4952 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4954 if (call->resendEvent) {
4955 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4960 /* Also adjusts the keep alive parameters for the call, to reflect
4961 * that we have just sent a packet (so keep alives aren't sent
4963 void rxi_Send(call, p, istack)
4964 register struct rx_call *call;
4965 register struct rx_packet *p;
4968 register struct rx_connection *conn = call->conn;
4970 /* Stamp each packet with the user supplied status */
4971 p->header.userStatus = call->localStatus;
4973 /* Allow the security object controlling this call's security to
4974 * make any last-minute changes to the packet */
4975 RXS_SendPacket(conn->securityObject, call, p);
4977 /* Since we're about to send SOME sort of packet to the peer, it's
4978 * safe to nuke any scheduled end-of-packets ack */
4979 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4981 /* Actually send the packet, filling in more connection-specific fields */
4982 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4983 MUTEX_EXIT(&call->lock);
4984 rxi_SendPacket(conn, p, istack);
4985 MUTEX_ENTER(&call->lock);
4986 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4988 /* Update last send time for this call (for keep-alive
4989 * processing), and for the connection (so that we can discover
4990 * idle connections) */
4991 conn->lastSendTime = call->lastSendTime = clock_Sec();
4995 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
4996 * that things are fine. Also called periodically to guarantee that nothing
4997 * falls through the cracks (e.g. (error + dally) connections have keepalive
4998 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5001 #ifdef RX_ENABLE_LOCKS
5002 int rxi_CheckCall(call, haveCTLock)
5003 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5004 #else /* RX_ENABLE_LOCKS */
5005 int rxi_CheckCall(call)
5006 #endif /* RX_ENABLE_LOCKS */
5007 register struct rx_call *call;
5009 register struct rx_connection *conn = call->conn;
5010 register struct rx_service *tservice;
5012 afs_uint32 deadTime;
5014 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5015 if (call->flags & RX_CALL_TQ_BUSY) {
5016 /* Call is active and will be reset by rxi_Start if it's
5017 * in an error state.
5022 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5023 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5024 ((afs_uint32)conn->peer->rtt >> 3) +
5025 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5027 /* These are computed to the second (+- 1 second). But that's
5028 * good enough for these values, which should be a significant
5029 * number of seconds. */
5030 if (now > (call->lastReceiveTime + deadTime)) {
5031 if (call->state == RX_STATE_ACTIVE) {
5032 rxi_CallError(call, RX_CALL_DEAD);
5036 #ifdef RX_ENABLE_LOCKS
5037 /* Cancel pending events */
5038 rxevent_Cancel(call->delayedAckEvent, call,
5039 RX_CALL_REFCOUNT_DELAY);
5040 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5041 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5042 if (call->refCount == 0) {
5043 rxi_FreeCall(call, haveCTLock);
5047 #else /* RX_ENABLE_LOCKS */
5050 #endif /* RX_ENABLE_LOCKS */
5052 /* Non-active calls are destroyed if they are not responding
5053 * to pings; active calls are simply flagged in error, so the
5054 * attached process can die reasonably gracefully. */
5056 /* see if we have a non-activity timeout */
5057 tservice = conn->service;
5058 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5059 && tservice->idleDeadTime
5060 && ((call->startWait + tservice->idleDeadTime) < now)) {
5061 if (call->state == RX_STATE_ACTIVE) {
5062 rxi_CallError(call, RX_CALL_TIMEOUT);
5066 /* see if we have a hard timeout */
5067 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5068 if (call->state == RX_STATE_ACTIVE)
5069 rxi_CallError(call, RX_CALL_TIMEOUT);
5076 /* When a call is in progress, this routine is called occasionally to
5077 * make sure that some traffic has arrived (or been sent to) the peer.
5078 * If nothing has arrived in a reasonable amount of time, the call is
5079 * declared dead; if nothing has been sent for a while, we send a
5080 * keep-alive packet (if we're actually trying to keep the call alive)
5082 void rxi_KeepAliveEvent(event, call, dummy)
5083 struct rxevent *event;
5084 register struct rx_call *call;
5086 struct rx_connection *conn;
5089 MUTEX_ENTER(&call->lock);
5090 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5091 if (event == call->keepAliveEvent)
5092 call->keepAliveEvent = (struct rxevent *) 0;
5095 #ifdef RX_ENABLE_LOCKS
5096 if(rxi_CheckCall(call, 0)) {
5097 MUTEX_EXIT(&call->lock);
5100 #else /* RX_ENABLE_LOCKS */
5101 if (rxi_CheckCall(call)) return;
5102 #endif /* RX_ENABLE_LOCKS */
5104 /* Don't try to keep alive dallying calls */
5105 if (call->state == RX_STATE_DALLY) {
5106 MUTEX_EXIT(&call->lock);
5111 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5112 /* Don't try to send keepalives if there is unacknowledged data */
5113 /* the rexmit code should be good enough, this little hack
5114 * doesn't quite work XXX */
5115 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5117 rxi_ScheduleKeepAliveEvent(call);
5118 MUTEX_EXIT(&call->lock);
5122 void rxi_ScheduleKeepAliveEvent(call)
5123 register struct rx_call *call;
5125 if (!call->keepAliveEvent) {
5127 clock_GetTime(&when);
5128 when.sec += call->conn->secondsUntilPing;
5129 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5130 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5134 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5135 void rxi_KeepAliveOn(call)
5136 register struct rx_call *call;
5138 /* Pretend last packet received was received now--i.e. if another
5139 * packet isn't received within the keep alive time, then the call
5140 * will die; Initialize last send time to the current time--even
5141 * if a packet hasn't been sent yet. This will guarantee that a
5142 * keep-alive is sent within the ping time */
5143 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5144 rxi_ScheduleKeepAliveEvent(call);
5147 /* This routine is called to send connection abort messages
5148 * that have been delayed to throttle looping clients. */
5149 void rxi_SendDelayedConnAbort(event, conn, dummy)
5150 struct rxevent *event;
5151 register struct rx_connection *conn;
5155 struct rx_packet *packet;
5157 MUTEX_ENTER(&conn->conn_data_lock);
5158 conn->delayedAbortEvent = (struct rxevent *) 0;
5159 error = htonl(conn->error);
5161 MUTEX_EXIT(&conn->conn_data_lock);
5162 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5164 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5165 RX_PACKET_TYPE_ABORT, (char *)&error,
5167 rxi_FreePacket(packet);
5171 /* This routine is called to send call abort messages
5172 * that have been delayed to throttle looping clients. */
5173 void rxi_SendDelayedCallAbort(event, call, dummy)
5174 struct rxevent *event;
5175 register struct rx_call *call;
5179 struct rx_packet *packet;
5181 MUTEX_ENTER(&call->lock);
5182 call->delayedAbortEvent = (struct rxevent *) 0;
5183 error = htonl(call->error);
5185 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5187 packet = rxi_SendSpecial(call, call->conn, packet,
5188 RX_PACKET_TYPE_ABORT, (char *)&error,
5190 rxi_FreePacket(packet);
5192 MUTEX_EXIT(&call->lock);
5195 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5196 * seconds) to ask the client to authenticate itself. The routine
5197 * issues a challenge to the client, which is obtained from the
5198 * security object associated with the connection */
5199 void rxi_ChallengeEvent(event, conn, dummy)
5200 struct rxevent *event;
5201 register struct rx_connection *conn;
5204 conn->challengeEvent = (struct rxevent *) 0;
5205 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5206 register struct rx_packet *packet;
5208 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5210 /* If there's no packet available, do this later. */
5211 RXS_GetChallenge(conn->securityObject, conn, packet);
5212 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5213 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5214 rxi_FreePacket(packet);
5216 clock_GetTime(&when);
5217 when.sec += RX_CHALLENGE_TIMEOUT;
5218 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5222 /* Call this routine to start requesting the client to authenticate
5223 * itself. This will continue until authentication is established,
5224 * the call times out, or an invalid response is returned. The
5225 * security object associated with the connection is asked to create
5226 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5227 * defined earlier. */
5228 void rxi_ChallengeOn(conn)
5229 register struct rx_connection *conn;
5231 if (!conn->challengeEvent) {
5232 RXS_CreateChallenge(conn->securityObject, conn);
5233 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5238 /* Compute round trip time of the packet provided, in *rttp.
5241 /* rxi_ComputeRoundTripTime is called with peer locked. */
5242 void rxi_ComputeRoundTripTime(p, sentp, peer)
5243 register struct clock *sentp; /* may be null */
5244 register struct rx_peer *peer; /* may be null */
5245 register struct rx_packet *p;
5247 struct clock thisRtt, *rttp = &thisRtt;
5249 register int rtt_timeout;
5250 static char id[]="@(#)adaptive RTO";
5252 clock_GetTime(rttp);
5253 if (clock_Lt(rttp, sentp)) {
5255 return; /* somebody set the clock back, don't count this time. */
5257 clock_Sub(rttp, sentp);
5258 MUTEX_ENTER(&rx_stats_mutex);
5259 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5260 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5261 if (rttp->sec > 60) {
5262 MUTEX_EXIT(&rx_stats_mutex);
5263 return; /* somebody set the clock ahead */
5265 rx_stats.maxRtt = *rttp;
5267 clock_Add(&rx_stats.totalRtt, rttp);
5268 rx_stats.nRttSamples++;
5269 MUTEX_EXIT(&rx_stats_mutex);
5271 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5273 /* Apply VanJacobson round-trip estimations */
5278 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5279 * srtt is stored as fixed point with 3 bits after the binary
5280 * point (i.e., scaled by 8). The following magic is
5281 * equivalent to the smoothing algorithm in rfc793 with an
5282 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5283 * srtt*8 = srtt*8 + rtt - srtt
5284 * srtt = srtt + rtt/8 - srtt/8
5287 delta = MSEC(rttp) - (peer->rtt >> 3);
5291 * We accumulate a smoothed rtt variance (actually, a smoothed
5292 * mean difference), then set the retransmit timer to smoothed
5293 * rtt + 4 times the smoothed variance (was 2x in van's original
5294 * paper, but 4x works better for me, and apparently for him as
5296 * rttvar is stored as
5297 * fixed point with 2 bits after the binary point (scaled by
5298 * 4). The following is equivalent to rfc793 smoothing with
5299 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5300 * replaces rfc793's wired-in beta.
5301 * dev*4 = dev*4 + (|actual - expected| - dev)
5307 delta -= (peer->rtt_dev >> 2);
5308 peer->rtt_dev += delta;
5311 /* I don't have a stored RTT so I start with this value. Since I'm
5312 * probably just starting a call, and will be pushing more data down
5313 * this, I expect congestion to increase rapidly. So I fudge a
5314 * little, and I set deviance to half the rtt. In practice,
5315 * deviance tends to approach something a little less than
5316 * half the smoothed rtt. */
5317 peer->rtt = (MSEC(rttp) << 3) + 8;
5318 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5320 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5321 * the other of these connections is usually in a user process, and can
5322 * be switched and/or swapped out. So on fast, reliable networks, the
5323 * timeout would otherwise be too short.
5325 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5326 clock_Zero(&(peer->timeout));
5327 clock_Addmsec(&(peer->timeout), rtt_timeout);
5329 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5330 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5331 (peer->timeout.sec),(peer->timeout.usec)) );
5335 /* Find all server connections that have not been active for a long time, and
5337 void rxi_ReapConnections()
5340 clock_GetTime(&now);
5342 /* Find server connection structures that haven't been used for
5343 * greater than rx_idleConnectionTime */
5344 { struct rx_connection **conn_ptr, **conn_end;
5345 int i, havecalls = 0;
5346 MUTEX_ENTER(&rx_connHashTable_lock);
5347 for (conn_ptr = &rx_connHashTable[0],
5348 conn_end = &rx_connHashTable[rx_hashTableSize];
5349 conn_ptr < conn_end; conn_ptr++) {
5350 struct rx_connection *conn, *next;
5351 struct rx_call *call;
5355 for (conn = *conn_ptr; conn; conn = next) {
5356 /* XXX -- Shouldn't the connection be locked? */
5359 for(i=0;i<RX_MAXCALLS;i++) {
5360 call = conn->call[i];
5363 MUTEX_ENTER(&call->lock);
5364 #ifdef RX_ENABLE_LOCKS
5365 result = rxi_CheckCall(call, 1);
5366 #else /* RX_ENABLE_LOCKS */
5367 result = rxi_CheckCall(call);
5368 #endif /* RX_ENABLE_LOCKS */
5369 MUTEX_EXIT(&call->lock);
5371 /* If CheckCall freed the call, it might
5372 * have destroyed the connection as well,
5373 * which screws up the linked lists.
5379 if (conn->type == RX_SERVER_CONNECTION) {
5380 /* This only actually destroys the connection if
5381 * there are no outstanding calls */
5382 MUTEX_ENTER(&conn->conn_data_lock);
5383 if (!havecalls && !conn->refCount &&
5384 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5385 conn->refCount++; /* it will be decr in rx_DestroyConn */
5386 MUTEX_EXIT(&conn->conn_data_lock);
5387 #ifdef RX_ENABLE_LOCKS
5388 rxi_DestroyConnectionNoLock(conn);
5389 #else /* RX_ENABLE_LOCKS */
5390 rxi_DestroyConnection(conn);
5391 #endif /* RX_ENABLE_LOCKS */
5393 #ifdef RX_ENABLE_LOCKS
5395 MUTEX_EXIT(&conn->conn_data_lock);
5397 #endif /* RX_ENABLE_LOCKS */
5401 #ifdef RX_ENABLE_LOCKS
5402 while (rx_connCleanup_list) {
5403 struct rx_connection *conn;
5404 conn = rx_connCleanup_list;
5405 rx_connCleanup_list = rx_connCleanup_list->next;
5406 MUTEX_EXIT(&rx_connHashTable_lock);
5407 rxi_CleanupConnection(conn);
5408 MUTEX_ENTER(&rx_connHashTable_lock);
5410 MUTEX_EXIT(&rx_connHashTable_lock);
5411 #endif /* RX_ENABLE_LOCKS */
5414 /* Find any peer structures that haven't been used (haven't had an
5415 * associated connection) for greater than rx_idlePeerTime */
5416 { struct rx_peer **peer_ptr, **peer_end;
5418 MUTEX_ENTER(&rx_rpc_stats);
5419 MUTEX_ENTER(&rx_peerHashTable_lock);
5420 for (peer_ptr = &rx_peerHashTable[0],
5421 peer_end = &rx_peerHashTable[rx_hashTableSize];
5422 peer_ptr < peer_end; peer_ptr++) {
5423 struct rx_peer *peer, *next, *prev;
5424 for (prev = peer = *peer_ptr; peer; peer = next) {
5426 code = MUTEX_TRYENTER(&peer->peer_lock);
5427 if ((code) && (peer->refCount == 0)
5428 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5429 rx_interface_stat_p rpc_stat, nrpc_stat;
5431 MUTEX_EXIT(&peer->peer_lock);
5432 MUTEX_DESTROY(&peer->peer_lock);
5433 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5434 rx_interface_stat)) {
5435 unsigned int num_funcs;
5436 if (!rpc_stat) break;
5437 queue_Remove(&rpc_stat->queue_header);
5438 queue_Remove(&rpc_stat->all_peers);
5439 num_funcs = rpc_stat->stats[0].func_total;
5440 space = sizeof(rx_interface_stat_t) +
5441 rpc_stat->stats[0].func_total *
5442 sizeof(rx_function_entry_v1_t);
5444 rxi_Free(rpc_stat, space);
5445 rxi_rpc_peer_stat_cnt -= num_funcs;
5448 MUTEX_ENTER(&rx_stats_mutex);
5449 rx_stats.nPeerStructs--;
5450 MUTEX_EXIT(&rx_stats_mutex);
5451 if (prev == *peer_ptr) {
5460 MUTEX_EXIT(&peer->peer_lock);
5466 MUTEX_EXIT(&rx_peerHashTable_lock);
5467 MUTEX_EXIT(&rx_rpc_stats);
5470 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5471 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5472 GC, just below. Really, we shouldn't have to keep moving packets from
5473 one place to another, but instead ought to always know if we can
5474 afford to hold onto a packet in its particular use. */
5475 MUTEX_ENTER(&rx_freePktQ_lock);
5476 if (rx_waitingForPackets) {
5477 rx_waitingForPackets = 0;
5478 #ifdef RX_ENABLE_LOCKS
5479 CV_BROADCAST(&rx_waitingForPackets_cv);
5481 osi_rxWakeup(&rx_waitingForPackets);
5484 MUTEX_EXIT(&rx_freePktQ_lock);
5486 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5487 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5491 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5492 * rx.h is sort of strange this is better. This is called with a security
5493 * object before it is discarded. Each connection using a security object has
5494 * its own refcount to the object so it won't actually be freed until the last
5495 * connection is destroyed.
5497 * This is the only rxs module call. A hold could also be written but no one
5500 int rxs_Release (aobj)
5501 struct rx_securityClass *aobj;
5503 return RXS_Close (aobj);
5507 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5508 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5509 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5510 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5512 /* Adjust our estimate of the transmission rate to this peer, given
5513 * that the packet p was just acked. We can adjust peer->timeout and
5514 * call->twind. Pragmatically, this is called
5515 * only with packets of maximal length.
5516 * Called with peer and call locked.
5519 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5520 register struct rx_peer *peer;
5521 register struct rx_call *call;
5522 struct rx_packet *p, *ackp;
5525 afs_int32 xferSize, xferMs;
5526 register afs_int32 minTime;
5529 /* Count down packets */
5530 if (peer->rateFlag > 0) peer->rateFlag--;
5531 /* Do nothing until we're enabled */
5532 if (peer->rateFlag != 0) return;
5533 if (!call->conn) return;
5535 /* Count only when the ack seems legitimate */
5536 switch (ackReason) {
5537 case RX_ACK_REQUESTED:
5538 xferSize = p->length + RX_HEADER_SIZE +
5539 call->conn->securityMaxTrailerSize;
5543 case RX_ACK_PING_RESPONSE:
5544 if (p) /* want the response to ping-request, not data send */
5546 clock_GetTime(&newTO);
5547 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5548 clock_Sub(&newTO, &call->pingRequestTime);
5549 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5553 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5560 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5561 ntohl(peer->host), ntohs(peer->port),
5562 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5563 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5566 /* Track only packets that are big enough. */
5567 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5571 /* absorb RTT data (in milliseconds) for these big packets */
5572 if (peer->smRtt == 0) {
5573 peer->smRtt = xferMs;
5575 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5576 if (!peer->smRtt) peer->smRtt = 1;
5579 if (peer->countDown) {
5583 peer->countDown = 10; /* recalculate only every so often */
5585 /* In practice, we can measure only the RTT for full packets,
5586 * because of the way Rx acks the data that it receives. (If it's
5587 * smaller than a full packet, it often gets implicitly acked
5588 * either by the call response (from a server) or by the next call
5589 * (from a client), and either case confuses transmission times
5590 * with processing times.) Therefore, replace the above
5591 * more-sophisticated processing with a simpler version, where the
5592 * smoothed RTT is kept for full-size packets, and the time to
5593 * transmit a windowful of full-size packets is simply RTT *
5594 * windowSize. Again, we take two steps:
5595 - ensure the timeout is large enough for a single packet's RTT;
5596 - ensure that the window is small enough to fit in the desired timeout.*/
5598 /* First, the timeout check. */
5599 minTime = peer->smRtt;
5600 /* Get a reasonable estimate for a timeout period */
5602 newTO.sec = minTime / 1000;
5603 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5605 /* Increase the timeout period so that we can always do at least
5606 * one packet exchange */
5607 if (clock_Gt(&newTO, &peer->timeout)) {
5609 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5610 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5611 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5614 peer->timeout = newTO;
5617 /* Now, get an estimate for the transmit window size. */
5618 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5619 /* Now, convert to the number of full packets that could fit in a
5620 * reasonable fraction of that interval */
5621 minTime /= (peer->smRtt << 1);
5622 xferSize = minTime; /* (make a copy) */
5624 /* Now clamp the size to reasonable bounds. */
5625 if (minTime <= 1) minTime = 1;
5626 else if (minTime > rx_Window) minTime = rx_Window;
5627 /* if (minTime != peer->maxWindow) {
5628 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5629 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5630 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5632 peer->maxWindow = minTime;
5633 elide... call->twind = minTime;
5637 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5638 * Discern this by calculating the timeout necessary for rx_Window
5640 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5641 /* calculate estimate for transmission interval in milliseconds */
5642 minTime = rx_Window * peer->smRtt;
5643 if (minTime < 1000) {
5644 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5645 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5646 peer->timeout.usec, peer->smRtt,
5649 newTO.sec = 0; /* cut back on timeout by half a second */
5650 newTO.usec = 500000;
5651 clock_Sub(&peer->timeout, &newTO);
5656 } /* end of rxi_ComputeRate */
5657 #endif /* ADAPT_WINDOW */
5665 /* Don't call this debugging routine directly; use dpf */
5667 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5668 a11, a12, a13, a14, a15)
5672 clock_GetTime(&now);
5673 fprintf(rx_Log, " %u.%.3u:", now.sec, now.usec/1000);
5674 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5681 * This function is used to process the rx_stats structure that is local
5682 * to a process as well as an rx_stats structure received from a remote
5683 * process (via rxdebug). Therefore, it needs to do minimal version
5686 void rx_PrintTheseStats (file, s, size, freePackets, version)
5689 int size; /* some idea of version control */
5690 afs_int32 freePackets;
5695 if (size != sizeof(struct rx_stats)) {
5697 "Unexpected size of stats structure: was %d, expected %d\n",
5698 size, sizeof(struct rx_stats));
5702 "rx stats: free packets %d, "
5707 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5709 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5710 s->receivePktAllocFailures,
5711 s->receiveCbufPktAllocFailures,
5712 s->sendPktAllocFailures,
5713 s->sendCbufPktAllocFailures,
5714 s->specialPktAllocFailures);
5717 "alloc-failures(rcv %d,send %d,ack %d)\n",
5718 s->receivePktAllocFailures,
5719 s->sendPktAllocFailures,
5720 s->specialPktAllocFailures);
5725 "bogusReads %d (last from host %x), "
5731 s->bogusPacketOnRead,
5734 s->noPacketBuffersOnRead,
5738 fprintf(file, " packets read: ");
5739 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5745 fprintf(file, "\n");
5748 " other read counters: data %d, "
5756 s->spuriousPacketsRead,
5757 s->ignorePacketDally);
5759 fprintf(file, " packets sent: ");
5760 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5766 fprintf(file, "\n");
5769 " other send counters: ack %d, "
5770 "data %d (not resends), "
5773 "acked&ignored %d\n",
5776 s->dataPacketsReSent,
5777 s->dataPacketsPushed,
5778 s->ignoreAckedPacket);
5781 " \t(these should be small) sendFailed %d, "
5786 if (s->nRttSamples) {
5788 " Average rtt is %0.3f, with %d samples\n",
5789 clock_Float(&s->totalRtt)/s->nRttSamples,
5793 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5794 clock_Float(&s->minRtt),
5795 clock_Float(&s->maxRtt));
5799 " %d server connections, "
5800 "%d client connections, "
5803 "%d free call structs\n",
5808 s->nFreeCallStructs);
5810 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5812 " %d clock updates\n",
5818 /* for backward compatibility */
5819 void rx_PrintStats(file)
5822 MUTEX_ENTER(&rx_stats_mutex);
5823 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5824 MUTEX_EXIT(&rx_stats_mutex);
5827 void rx_PrintPeerStats(file, peer)
5829 struct rx_peer *peer;
5834 "burst wait %u.%d.\n",
5838 peer->burstWait.sec,
5839 peer->burstWait.usec);
5843 "retry time %u.%06d, "
5854 "max in packet skew %d, "
5855 "max out packet skew %d\n",
5858 peer->outPacketSkew);
5861 #ifdef AFS_PTHREAD_ENV
5863 * This mutex protects the following static variables:
5867 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5868 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5870 #define LOCK_RX_DEBUG
5871 #define UNLOCK_RX_DEBUG
5872 #endif /* AFS_PTHREAD_ENV */
5874 static int MakeDebugCall(
5876 afs_uint32 remoteAddr,
5877 afs_uint16 remotePort,
5885 static afs_int32 counter = 100;
5887 struct rx_header theader;
5889 register afs_int32 code;
5891 struct sockaddr_in taddr, faddr;
5896 endTime = time(0) + 20; /* try for 20 seconds */
5900 tp = &tbuffer[sizeof(struct rx_header)];
5901 taddr.sin_family = AF_INET;
5902 taddr.sin_port = remotePort;
5903 taddr.sin_addr.s_addr = remoteAddr;
5905 memset(&theader, 0, sizeof(theader));
5906 theader.epoch = htonl(999);
5908 theader.callNumber = htonl(counter);
5911 theader.type = type;
5912 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5913 theader.serviceId = 0;
5915 bcopy(&theader, tbuffer, sizeof(theader));
5916 bcopy(inputData, tp, inputLength);
5917 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5918 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5920 /* see if there's a packet available */
5922 FD_SET(socket, &imask);
5925 code = select(socket+1, &imask, 0, 0, &tv);
5927 /* now receive a packet */
5928 faddrLen = sizeof(struct sockaddr_in);
5929 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
5930 (struct sockaddr *) &faddr, &faddrLen);
5932 bcopy(tbuffer, &theader, sizeof(struct rx_header));
5933 if (counter == ntohl(theader.callNumber)) break;
5936 /* see if we've timed out */
5937 if (endTime < time(0)) return -1;
5939 code -= sizeof(struct rx_header);
5940 if (code > outputLength) code = outputLength;
5941 bcopy(tp, outputData, code);
5945 afs_int32 rx_GetServerDebug(
5947 afs_uint32 remoteAddr,
5948 afs_uint16 remotePort,
5949 struct rx_debugStats *stat,
5950 afs_uint32 *supportedValues
5953 struct rx_debugIn in;
5956 *supportedValues = 0;
5957 in.type = htonl(RX_DEBUGI_GETSTATS);
5960 rc = MakeDebugCall(socket,
5963 RX_PACKET_TYPE_DEBUG,
5970 * If the call was successful, fixup the version and indicate
5971 * what contents of the stat structure are valid.
5972 * Also do net to host conversion of fields here.
5976 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
5977 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
5979 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
5980 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
5982 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
5983 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
5985 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
5986 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
5988 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
5989 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
5991 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5992 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
5994 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
5995 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
5998 stat->nFreePackets = ntohl(stat->nFreePackets);
5999 stat->packetReclaims = ntohl(stat->packetReclaims);
6000 stat->callsExecuted = ntohl(stat->callsExecuted);
6001 stat->nWaiting = ntohl(stat->nWaiting);
6002 stat->idleThreads = ntohl(stat->idleThreads);
6008 afs_int32 rx_GetServerStats(
6010 afs_uint32 remoteAddr,
6011 afs_uint16 remotePort,
6012 struct rx_stats *stat,
6013 afs_uint32 *supportedValues
6016 struct rx_debugIn in;
6017 afs_int32 *lp = (afs_int32 *) stat;
6022 * supportedValues is currently unused, but added to allow future
6023 * versioning of this function.
6026 *supportedValues = 0;
6027 in.type = htonl(RX_DEBUGI_RXSTATS);
6029 memset(stat, 0, sizeof(*stat));
6031 rc = MakeDebugCall(socket,
6034 RX_PACKET_TYPE_DEBUG,
6043 * Do net to host conversion here
6046 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6054 afs_int32 rx_GetServerVersion(
6056 afs_uint32 remoteAddr,
6057 afs_uint16 remotePort,
6058 size_t version_length,
6063 return MakeDebugCall(socket,
6066 RX_PACKET_TYPE_VERSION,
6073 afs_int32 rx_GetServerConnections(
6075 afs_uint32 remoteAddr,
6076 afs_uint16 remotePort,
6077 afs_int32 *nextConnection,
6079 afs_uint32 debugSupportedValues,
6080 struct rx_debugConn *conn,
6081 afs_uint32 *supportedValues
6084 struct rx_debugIn in;
6089 * supportedValues is currently unused, but added to allow future
6090 * versioning of this function.
6093 *supportedValues = 0;
6094 if (allConnections) {
6095 in.type = htonl(RX_DEBUGI_GETALLCONN);
6097 in.type = htonl(RX_DEBUGI_GETCONN);
6099 in.index = htonl(*nextConnection);
6100 memset(conn, 0, sizeof(*conn));
6102 rc = MakeDebugCall(socket,
6105 RX_PACKET_TYPE_DEBUG,
6112 *nextConnection += 1;
6115 * Convert old connection format to new structure.
6118 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6119 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6120 #define MOVEvL(a) (conn->a = vL->a)
6122 /* any old or unrecognized version... */
6123 for (i=0;i<RX_MAXCALLS;i++) {
6124 MOVEvL(callState[i]);
6125 MOVEvL(callMode[i]);
6126 MOVEvL(callFlags[i]);
6127 MOVEvL(callOther[i]);
6129 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6130 MOVEvL(secStats.type);
6131 MOVEvL(secStats.level);
6132 MOVEvL(secStats.flags);
6133 MOVEvL(secStats.expires);
6134 MOVEvL(secStats.packetsReceived);
6135 MOVEvL(secStats.packetsSent);
6136 MOVEvL(secStats.bytesReceived);
6137 MOVEvL(secStats.bytesSent);
6142 * Do net to host conversion here
6144 * I don't convert host or port since we are most likely
6145 * going to want these in NBO.
6147 conn->cid = ntohl(conn->cid);
6148 conn->serial = ntohl(conn->serial);
6149 for(i=0;i<RX_MAXCALLS;i++) {
6150 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6152 conn->error = ntohl(conn->error);
6153 conn->secStats.flags = ntohl(conn->secStats.flags);
6154 conn->secStats.expires = ntohl(conn->secStats.expires);
6155 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6156 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6157 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6158 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6159 conn->epoch = ntohl(conn->epoch);
6160 conn->natMTU = ntohl(conn->natMTU);
6166 afs_int32 rx_GetServerPeers(
6168 afs_uint32 remoteAddr,
6169 afs_uint16 remotePort,
6170 afs_int32 *nextPeer,
6171 afs_uint32 debugSupportedValues,
6172 struct rx_debugPeer *peer,
6173 afs_uint32 *supportedValues
6176 struct rx_debugIn in;
6181 * supportedValues is currently unused, but added to allow future
6182 * versioning of this function.
6185 *supportedValues = 0;
6186 in.type = htonl(RX_DEBUGI_GETPEER);
6187 in.index = htonl(*nextPeer);
6188 memset(peer, 0, sizeof(*peer));
6190 rc = MakeDebugCall(socket,
6193 RX_PACKET_TYPE_DEBUG,
6203 * Do net to host conversion here
6205 * I don't convert host or port since we are most likely
6206 * going to want these in NBO.
6208 peer->ifMTU = ntohs(peer->ifMTU);
6209 peer->idleWhen = ntohl(peer->idleWhen);
6210 peer->refCount = ntohs(peer->refCount);
6211 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6212 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6213 peer->rtt = ntohl(peer->rtt);
6214 peer->rtt_dev = ntohl(peer->rtt_dev);
6215 peer->timeout.sec = ntohl(peer->timeout.sec);
6216 peer->timeout.usec = ntohl(peer->timeout.usec);
6217 peer->nSent = ntohl(peer->nSent);
6218 peer->reSends = ntohl(peer->reSends);
6219 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6220 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6221 peer->rateFlag = ntohl(peer->rateFlag);
6222 peer->natMTU = ntohs(peer->natMTU);
6223 peer->maxMTU = ntohs(peer->maxMTU);
6224 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6225 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6226 peer->MTU = ntohs(peer->MTU);
6227 peer->cwind = ntohs(peer->cwind);
6228 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6229 peer->congestSeq = ntohs(peer->congestSeq);
6230 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6231 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6232 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6233 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6238 #endif /* RXDEBUG */
6240 void shutdown_rx(void)
6242 struct rx_serverQueueEntry *np;
6246 if (rxinit_status == 1) {
6248 return; /* Already shutdown. */
6252 struct rx_peer **peer_ptr, **peer_end;
6253 for (peer_ptr = &rx_peerHashTable[0],
6254 peer_end = &rx_peerHashTable[rx_hashTableSize];
6255 peer_ptr < peer_end; peer_ptr++) {
6256 struct rx_peer *peer, *next;
6257 for (peer = *peer_ptr; peer; peer = next) {
6258 rx_interface_stat_p rpc_stat, nrpc_stat;
6260 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6261 rx_interface_stat)) {
6262 unsigned int num_funcs;
6263 if (!rpc_stat) break;
6264 queue_Remove(&rpc_stat->queue_header);
6265 queue_Remove(&rpc_stat->all_peers);
6266 num_funcs = rpc_stat->stats[0].func_total;
6267 space = sizeof(rx_interface_stat_t) +
6268 rpc_stat->stats[0].func_total *
6269 sizeof(rx_function_entry_v1_t);
6271 rxi_Free(rpc_stat, space);
6272 MUTEX_ENTER(&rx_rpc_stats);
6273 rxi_rpc_peer_stat_cnt -= num_funcs;
6274 MUTEX_EXIT(&rx_rpc_stats);
6278 MUTEX_ENTER(&rx_stats_mutex);
6279 rx_stats.nPeerStructs--;
6280 MUTEX_EXIT(&rx_stats_mutex);
6284 for (i = 0; i<RX_MAX_SERVICES; i++) {
6286 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6288 for (i = 0; i < rx_hashTableSize; i++) {
6289 register struct rx_connection *tc, *ntc;
6290 MUTEX_ENTER(&rx_connHashTable_lock);
6291 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6293 for (j = 0; j < RX_MAXCALLS; j++) {
6295 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6298 rxi_Free(tc, sizeof(*tc));
6300 MUTEX_EXIT(&rx_connHashTable_lock);
6303 MUTEX_ENTER(&freeSQEList_lock);
6305 while (np = rx_FreeSQEList) {
6306 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6307 MUTEX_DESTROY(&np->lock);
6308 rxi_Free(np, sizeof(*np));
6311 MUTEX_EXIT(&freeSQEList_lock);
6312 MUTEX_DESTROY(&freeSQEList_lock);
6313 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6314 MUTEX_DESTROY(&rx_connHashTable_lock);
6315 MUTEX_DESTROY(&rx_peerHashTable_lock);
6316 MUTEX_DESTROY(&rx_serverPool_lock);
6318 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6319 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6321 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6322 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6324 rxi_FreeAllPackets();
6326 MUTEX_ENTER(&rx_stats_mutex);
6327 rxi_dataQuota = RX_MAX_QUOTA;
6328 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6329 MUTEX_EXIT(&rx_stats_mutex);
6335 #ifdef RX_ENABLE_LOCKS
6336 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6338 if (!MUTEX_ISMINE(lockaddr))
6339 osi_Panic("Lock not held: %s", msg);
6341 #endif /* RX_ENABLE_LOCKS */
6346 * Routines to implement connection specific data.
6349 int rx_KeyCreate(rx_destructor_t rtn)
6352 MUTEX_ENTER(&rxi_keyCreate_lock);
6353 key = rxi_keyCreate_counter++;
6354 rxi_keyCreate_destructor = (rx_destructor_t *)
6355 realloc((void *)rxi_keyCreate_destructor,
6356 (key+1) * sizeof(rx_destructor_t));
6357 rxi_keyCreate_destructor[key] = rtn;
6358 MUTEX_EXIT(&rxi_keyCreate_lock);
6362 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6365 MUTEX_ENTER(&conn->conn_data_lock);
6366 if (!conn->specific) {
6367 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6368 for (i = 0 ; i < key ; i++)
6369 conn->specific[i] = NULL;
6370 conn->nSpecific = key+1;
6371 conn->specific[key] = ptr;
6372 } else if (key >= conn->nSpecific) {
6373 conn->specific = (void **)
6374 realloc(conn->specific,(key+1)*sizeof(void *));
6375 for (i = conn->nSpecific ; i < key ; i++)
6376 conn->specific[i] = NULL;
6377 conn->nSpecific = key+1;
6378 conn->specific[key] = ptr;
6380 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6381 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6382 conn->specific[key] = ptr;
6384 MUTEX_EXIT(&conn->conn_data_lock);
6387 void *rx_GetSpecific(struct rx_connection *conn, int key)
6390 MUTEX_ENTER(&conn->conn_data_lock);
6391 if (key >= conn->nSpecific)
6394 ptr = conn->specific[key];
6395 MUTEX_EXIT(&conn->conn_data_lock);
6399 #endif /* !KERNEL */
6402 * processStats is a queue used to store the statistics for the local
6403 * process. Its contents are similar to the contents of the rpcStats
6404 * queue on a rx_peer structure, but the actual data stored within
6405 * this queue contains totals across the lifetime of the process (assuming
6406 * the stats have not been reset) - unlike the per peer structures
6407 * which can come and go based upon the peer lifetime.
6410 static struct rx_queue processStats = {&processStats,&processStats};
6413 * peerStats is a queue used to store the statistics for all peer structs.
6414 * Its contents are the union of all the peer rpcStats queues.
6417 static struct rx_queue peerStats = {&peerStats,&peerStats};
6420 * rxi_monitor_processStats is used to turn process wide stat collection
6424 static int rxi_monitor_processStats = 0;
6427 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6430 static int rxi_monitor_peerStats = 0;
6433 * rxi_AddRpcStat - given all of the information for a particular rpc
6434 * call, create (if needed) and update the stat totals for the rpc.
6438 * IN stats - the queue of stats that will be updated with the new value
6440 * IN rxInterface - a unique number that identifies the rpc interface
6442 * IN currentFunc - the index of the function being invoked
6444 * IN totalFunc - the total number of functions in this interface
6446 * IN queueTime - the amount of time this function waited for a thread
6448 * IN execTime - the amount of time this function invocation took to execute
6450 * IN bytesSent - the number bytes sent by this invocation
6452 * IN bytesRcvd - the number bytes received by this invocation
6454 * IN isServer - if true, this invocation was made to a server
6456 * IN remoteHost - the ip address of the remote host
6458 * IN remotePort - the port of the remote host
6460 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6462 * INOUT counter - if a new stats structure is allocated, the counter will
6463 * be updated with the new number of allocated stat structures
6470 static int rxi_AddRpcStat(
6471 struct rx_queue *stats,
6472 afs_uint32 rxInterface,
6473 afs_uint32 currentFunc,
6474 afs_uint32 totalFunc,
6475 struct clock *queueTime,
6476 struct clock *execTime,
6477 afs_hyper_t *bytesSent,
6478 afs_hyper_t *bytesRcvd,
6480 afs_uint32 remoteHost,
6481 afs_uint32 remotePort,
6483 unsigned int *counter)
6486 rx_interface_stat_p rpc_stat, nrpc_stat;
6489 * See if there's already a structure for this interface
6492 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6493 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6494 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6498 * Didn't find a match so allocate a new structure and add it to the
6502 if ((rpc_stat == NULL) ||
6503 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6504 (rpc_stat->stats[0].remote_is_server != isServer)) {
6508 space = sizeof(rx_interface_stat_t) + totalFunc *
6509 sizeof(rx_function_entry_v1_t);
6511 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6512 if (rpc_stat == NULL) {
6516 *counter += totalFunc;
6517 for(i=0;i<totalFunc;i++) {
6518 rpc_stat->stats[i].remote_peer = remoteHost;
6519 rpc_stat->stats[i].remote_port = remotePort;
6520 rpc_stat->stats[i].remote_is_server = isServer;
6521 rpc_stat->stats[i].interfaceId = rxInterface;
6522 rpc_stat->stats[i].func_total = totalFunc;
6523 rpc_stat->stats[i].func_index = i;
6524 hzero(rpc_stat->stats[i].invocations);
6525 hzero(rpc_stat->stats[i].bytes_sent);
6526 hzero(rpc_stat->stats[i].bytes_rcvd);
6527 rpc_stat->stats[i].queue_time_sum.sec = 0;
6528 rpc_stat->stats[i].queue_time_sum.usec = 0;
6529 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6530 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6531 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6532 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6533 rpc_stat->stats[i].queue_time_max.sec = 0;
6534 rpc_stat->stats[i].queue_time_max.usec = 0;
6535 rpc_stat->stats[i].execution_time_sum.sec = 0;
6536 rpc_stat->stats[i].execution_time_sum.usec = 0;
6537 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6538 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6539 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6540 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6541 rpc_stat->stats[i].execution_time_max.sec = 0;
6542 rpc_stat->stats[i].execution_time_max.usec = 0;
6544 queue_Prepend(stats, rpc_stat);
6545 if (addToPeerList) {
6546 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6551 * Increment the stats for this function
6554 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6555 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6556 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6557 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6558 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6559 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6560 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6562 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6563 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6565 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6566 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6567 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6568 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6570 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6571 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6579 * rx_IncrementTimeAndCount - increment the times and count for a particular
6584 * IN peer - the peer who invoked the rpc
6586 * IN rxInterface - a unique number that identifies the rpc interface
6588 * IN currentFunc - the index of the function being invoked
6590 * IN totalFunc - the total number of functions in this interface
6592 * IN queueTime - the amount of time this function waited for a thread
6594 * IN execTime - the amount of time this function invocation took to execute
6596 * IN bytesSent - the number bytes sent by this invocation
6598 * IN bytesRcvd - the number bytes received by this invocation
6600 * IN isServer - if true, this invocation was made to a server
6607 void rx_IncrementTimeAndCount(
6608 struct rx_peer *peer,
6609 afs_uint32 rxInterface,
6610 afs_uint32 currentFunc,
6611 afs_uint32 totalFunc,
6612 struct clock *queueTime,
6613 struct clock *execTime,
6614 afs_hyper_t *bytesSent,
6615 afs_hyper_t *bytesRcvd,
6619 MUTEX_ENTER(&rx_rpc_stats);
6620 MUTEX_ENTER(&peer->peer_lock);
6622 if (rxi_monitor_peerStats) {
6623 rxi_AddRpcStat(&peer->rpcStats,
6635 &rxi_rpc_peer_stat_cnt);
6638 if (rxi_monitor_processStats) {
6639 rxi_AddRpcStat(&processStats,
6651 &rxi_rpc_process_stat_cnt);
6654 MUTEX_EXIT(&peer->peer_lock);
6655 MUTEX_EXIT(&rx_rpc_stats);
6660 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6664 * IN callerVersion - the rpc stat version of the caller.
6666 * IN count - the number of entries to marshall.
6668 * IN stats - pointer to stats to be marshalled.
6670 * OUT ptr - Where to store the marshalled data.
6676 void rx_MarshallProcessRPCStats(
6677 afs_uint32 callerVersion,
6679 rx_function_entry_v1_t *stats,
6686 * We only support the first version
6688 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6689 *(ptr++) = stats->remote_peer;
6690 *(ptr++) = stats->remote_port;
6691 *(ptr++) = stats->remote_is_server;
6692 *(ptr++) = stats->interfaceId;
6693 *(ptr++) = stats->func_total;
6694 *(ptr++) = stats->func_index;
6695 *(ptr++) = hgethi(stats->invocations);
6696 *(ptr++) = hgetlo(stats->invocations);
6697 *(ptr++) = hgethi(stats->bytes_sent);
6698 *(ptr++) = hgetlo(stats->bytes_sent);
6699 *(ptr++) = hgethi(stats->bytes_rcvd);
6700 *(ptr++) = hgetlo(stats->bytes_rcvd);
6701 *(ptr++) = stats->queue_time_sum.sec;
6702 *(ptr++) = stats->queue_time_sum.usec;
6703 *(ptr++) = stats->queue_time_sum_sqr.sec;
6704 *(ptr++) = stats->queue_time_sum_sqr.usec;
6705 *(ptr++) = stats->queue_time_min.sec;
6706 *(ptr++) = stats->queue_time_min.usec;
6707 *(ptr++) = stats->queue_time_max.sec;
6708 *(ptr++) = stats->queue_time_max.usec;
6709 *(ptr++) = stats->execution_time_sum.sec;
6710 *(ptr++) = stats->execution_time_sum.usec;
6711 *(ptr++) = stats->execution_time_sum_sqr.sec;
6712 *(ptr++) = stats->execution_time_sum_sqr.usec;
6713 *(ptr++) = stats->execution_time_min.sec;
6714 *(ptr++) = stats->execution_time_min.usec;
6715 *(ptr++) = stats->execution_time_max.sec;
6716 *(ptr++) = stats->execution_time_max.usec;
6722 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6727 * IN callerVersion - the rpc stat version of the caller
6729 * OUT myVersion - the rpc stat version of this function
6731 * OUT clock_sec - local time seconds
6733 * OUT clock_usec - local time microseconds
6735 * OUT allocSize - the number of bytes allocated to contain stats
6737 * OUT statCount - the number stats retrieved from this process.
6739 * OUT stats - the actual stats retrieved from this process.
6743 * Returns void. If successful, stats will != NULL.
6746 int rx_RetrieveProcessRPCStats(
6747 afs_uint32 callerVersion,
6748 afs_uint32 *myVersion,
6749 afs_uint32 *clock_sec,
6750 afs_uint32 *clock_usec,
6752 afs_uint32 *statCount,
6763 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6766 * Check to see if stats are enabled
6769 MUTEX_ENTER(&rx_rpc_stats);
6770 if (!rxi_monitor_processStats) {
6771 MUTEX_EXIT(&rx_rpc_stats);
6775 clock_GetTime(&now);
6776 *clock_sec = now.sec;
6777 *clock_usec = now.usec;
6780 * Allocate the space based upon the caller version
6782 * If the client is at an older version than we are,
6783 * we return the statistic data in the older data format, but
6784 * we still return our version number so the client knows we
6785 * are maintaining more data than it can retrieve.
6788 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6789 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6790 *statCount = rxi_rpc_process_stat_cnt;
6793 * This can't happen yet, but in the future version changes
6794 * can be handled by adding additional code here
6798 if (space > (size_t) 0) {
6800 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6803 register struct rx_peer *pp;
6806 rx_interface_stat_p rpc_stat, nrpc_stat;
6809 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6810 rx_interface_stat)) {
6812 * Copy the data based upon the caller version
6814 rx_MarshallProcessRPCStats(callerVersion,
6815 rpc_stat->stats[0].func_total,
6816 rpc_stat->stats, &ptr);
6822 MUTEX_EXIT(&rx_rpc_stats);
6827 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6831 * IN callerVersion - the rpc stat version of the caller
6833 * OUT myVersion - the rpc stat version of this function
6835 * OUT clock_sec - local time seconds
6837 * OUT clock_usec - local time microseconds
6839 * OUT allocSize - the number of bytes allocated to contain stats
6841 * OUT statCount - the number of stats retrieved from the individual
6844 * OUT stats - the actual stats retrieved from the individual peer structures.
6848 * Returns void. If successful, stats will != NULL.
6851 int rx_RetrievePeerRPCStats(
6852 afs_uint32 callerVersion,
6853 afs_uint32 *myVersion,
6854 afs_uint32 *clock_sec,
6855 afs_uint32 *clock_usec,
6857 afs_uint32 *statCount,
6868 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6871 * Check to see if stats are enabled
6874 MUTEX_ENTER(&rx_rpc_stats);
6875 if (!rxi_monitor_peerStats) {
6876 MUTEX_EXIT(&rx_rpc_stats);
6880 clock_GetTime(&now);
6881 *clock_sec = now.sec;
6882 *clock_usec = now.usec;
6885 * Allocate the space based upon the caller version
6887 * If the client is at an older version than we are,
6888 * we return the statistic data in the older data format, but
6889 * we still return our version number so the client knows we
6890 * are maintaining more data than it can retrieve.
6893 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6894 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6895 *statCount = rxi_rpc_peer_stat_cnt;
6898 * This can't happen yet, but in the future version changes
6899 * can be handled by adding additional code here
6903 if (space > (size_t) 0) {
6905 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6910 rx_interface_stat_p rpc_stat, nrpc_stat;
6913 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
6914 rx_interface_stat)) {
6916 * We have to fix the offset of rpc_stat since we are
6917 * keeping this structure on two rx_queues. The rx_queue
6918 * package assumes that the rx_queue member is the first
6919 * member of the structure. That is, rx_queue assumes that
6920 * any one item is only on one queue at a time. We are
6921 * breaking that assumption and so we have to do a little
6922 * math to fix our pointers.
6925 fix_offset = (char *) rpc_stat;
6926 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
6927 rpc_stat = (rx_interface_stat_p) fix_offset;
6930 * Copy the data based upon the caller version
6932 rx_MarshallProcessRPCStats(callerVersion,
6933 rpc_stat->stats[0].func_total,
6934 rpc_stat->stats, &ptr);
6940 MUTEX_EXIT(&rx_rpc_stats);
6945 * rx_FreeRPCStats - free memory allocated by
6946 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
6950 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
6951 * rx_RetrievePeerRPCStats
6953 * IN allocSize - the number of bytes in stats.
6960 void rx_FreeRPCStats(
6964 rxi_Free(stats, allocSize);
6968 * rx_queryProcessRPCStats - see if process rpc stat collection is
6969 * currently enabled.
6975 * Returns 0 if stats are not enabled != 0 otherwise
6978 int rx_queryProcessRPCStats()
6981 MUTEX_ENTER(&rx_rpc_stats);
6982 rc = rxi_monitor_processStats;
6983 MUTEX_EXIT(&rx_rpc_stats);
6988 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
6994 * Returns 0 if stats are not enabled != 0 otherwise
6997 int rx_queryPeerRPCStats()
7000 MUTEX_ENTER(&rx_rpc_stats);
7001 rc = rxi_monitor_peerStats;
7002 MUTEX_EXIT(&rx_rpc_stats);
7007 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7016 void rx_enableProcessRPCStats()
7018 MUTEX_ENTER(&rx_rpc_stats);
7019 rx_enable_stats = 1;
7020 rxi_monitor_processStats = 1;
7021 MUTEX_EXIT(&rx_rpc_stats);
7025 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7034 void rx_enablePeerRPCStats()
7036 MUTEX_ENTER(&rx_rpc_stats);
7037 rx_enable_stats = 1;
7038 rxi_monitor_peerStats = 1;
7039 MUTEX_EXIT(&rx_rpc_stats);
7043 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7052 void rx_disableProcessRPCStats()
7054 rx_interface_stat_p rpc_stat, nrpc_stat;
7057 MUTEX_ENTER(&rx_rpc_stats);
7060 * Turn off process statistics and if peer stats is also off, turn
7064 rxi_monitor_processStats = 0;
7065 if (rxi_monitor_peerStats == 0) {
7066 rx_enable_stats = 0;
7069 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7070 unsigned int num_funcs = 0;
7071 if (!rpc_stat) break;
7072 queue_Remove(rpc_stat);
7073 num_funcs = rpc_stat->stats[0].func_total;
7074 space = sizeof(rx_interface_stat_t) +
7075 rpc_stat->stats[0].func_total *
7076 sizeof(rx_function_entry_v1_t);
7078 rxi_Free(rpc_stat, space);
7079 rxi_rpc_process_stat_cnt -= num_funcs;
7081 MUTEX_EXIT(&rx_rpc_stats);
7085 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7094 void rx_disablePeerRPCStats()
7096 struct rx_peer **peer_ptr, **peer_end;
7099 MUTEX_ENTER(&rx_rpc_stats);
7102 * Turn off peer statistics and if process stats is also off, turn
7106 rxi_monitor_peerStats = 0;
7107 if (rxi_monitor_processStats == 0) {
7108 rx_enable_stats = 0;
7111 MUTEX_ENTER(&rx_peerHashTable_lock);
7112 for (peer_ptr = &rx_peerHashTable[0],
7113 peer_end = &rx_peerHashTable[rx_hashTableSize];
7114 peer_ptr < peer_end; peer_ptr++) {
7115 struct rx_peer *peer, *next, *prev;
7116 for (prev = peer = *peer_ptr; peer; peer = next) {
7118 code = MUTEX_TRYENTER(&peer->peer_lock);
7120 rx_interface_stat_p rpc_stat, nrpc_stat;
7122 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7123 rx_interface_stat)) {
7124 unsigned int num_funcs = 0;
7125 if (!rpc_stat) break;
7126 queue_Remove(&rpc_stat->queue_header);
7127 queue_Remove(&rpc_stat->all_peers);
7128 num_funcs = rpc_stat->stats[0].func_total;
7129 space = sizeof(rx_interface_stat_t) +
7130 rpc_stat->stats[0].func_total *
7131 sizeof(rx_function_entry_v1_t);
7133 rxi_Free(rpc_stat, space);
7134 rxi_rpc_peer_stat_cnt -= num_funcs;
7136 MUTEX_EXIT(&peer->peer_lock);
7137 if (prev == *peer_ptr) {
7149 MUTEX_EXIT(&rx_peerHashTable_lock);
7150 MUTEX_EXIT(&rx_rpc_stats);
7154 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7159 * IN clearFlag - flag indicating which stats to clear
7166 void rx_clearProcessRPCStats(
7167 afs_uint32 clearFlag)
7169 rx_interface_stat_p rpc_stat, nrpc_stat;
7171 MUTEX_ENTER(&rx_rpc_stats);
7173 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7174 unsigned int num_funcs = 0, i;
7175 num_funcs = rpc_stat->stats[0].func_total;
7176 for(i=0;i<num_funcs;i++) {
7177 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7178 hzero(rpc_stat->stats[i].invocations);
7180 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7181 hzero(rpc_stat->stats[i].bytes_sent);
7183 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7184 hzero(rpc_stat->stats[i].bytes_rcvd);
7186 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7187 rpc_stat->stats[i].queue_time_sum.sec = 0;
7188 rpc_stat->stats[i].queue_time_sum.usec = 0;
7190 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7191 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7192 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7194 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7195 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7196 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7198 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7199 rpc_stat->stats[i].queue_time_max.sec = 0;
7200 rpc_stat->stats[i].queue_time_max.usec = 0;
7202 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7203 rpc_stat->stats[i].execution_time_sum.sec = 0;
7204 rpc_stat->stats[i].execution_time_sum.usec = 0;
7206 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7207 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7208 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7210 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7211 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7212 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7214 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7215 rpc_stat->stats[i].execution_time_max.sec = 0;
7216 rpc_stat->stats[i].execution_time_max.usec = 0;
7221 MUTEX_EXIT(&rx_rpc_stats);
7225 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7230 * IN clearFlag - flag indicating which stats to clear
7237 void rx_clearPeerRPCStats(
7238 afs_uint32 clearFlag)
7240 rx_interface_stat_p rpc_stat, nrpc_stat;
7242 MUTEX_ENTER(&rx_rpc_stats);
7244 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7245 unsigned int num_funcs = 0, i;
7248 * We have to fix the offset of rpc_stat since we are
7249 * keeping this structure on two rx_queues. The rx_queue
7250 * package assumes that the rx_queue member is the first
7251 * member of the structure. That is, rx_queue assumes that
7252 * any one item is only on one queue at a time. We are
7253 * breaking that assumption and so we have to do a little
7254 * math to fix our pointers.
7257 fix_offset = (char *) rpc_stat;
7258 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7259 rpc_stat = (rx_interface_stat_p) fix_offset;
7261 num_funcs = rpc_stat->stats[0].func_total;
7262 for(i=0;i<num_funcs;i++) {
7263 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7264 hzero(rpc_stat->stats[i].invocations);
7266 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7267 hzero(rpc_stat->stats[i].bytes_sent);
7269 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7270 hzero(rpc_stat->stats[i].bytes_rcvd);
7272 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7273 rpc_stat->stats[i].queue_time_sum.sec = 0;
7274 rpc_stat->stats[i].queue_time_sum.usec = 0;
7276 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7277 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7278 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7280 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7281 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7282 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7284 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7285 rpc_stat->stats[i].queue_time_max.sec = 0;
7286 rpc_stat->stats[i].queue_time_max.usec = 0;
7288 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7289 rpc_stat->stats[i].execution_time_sum.sec = 0;
7290 rpc_stat->stats[i].execution_time_sum.usec = 0;
7292 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7293 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7294 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7296 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7297 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7298 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7300 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7301 rpc_stat->stats[i].execution_time_max.sec = 0;
7302 rpc_stat->stats[i].execution_time_max.usec = 0;
7307 MUTEX_EXIT(&rx_rpc_stats);
7311 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7312 * is authorized to enable/disable/clear RX statistics.
7314 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7316 void rx_SetRxStatUserOk(
7317 int (*proc)(struct rx_call *call))
7319 rxi_rxstat_userok = proc;
7322 int rx_RxStatUserOk(
7323 struct rx_call *call)
7325 if (!rxi_rxstat_userok)
7327 return rxi_rxstat_userok(call);